JDBC

Scio supports JDBC reads and writes.

Reads

Reads come in two flavors: a query-based variant backed by Beam’s JdbcIO and a “sharded select” that performs a parallelizable bulk read on an entire table.

Read via query

Query-based reads are supported with jdbcSelect. It expects a JdbcConnectionOptions to connect to the database. The statementPreparator argument may be used to set static parameters in the query, usually passed as arguments to the pipeline. The curried rowMapper function argument maps between a java.sql.ResultSet to the result type T.

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver

val (sc, args): (ScioContext, Args) = ???
val sourceArg: String = args("wordCountSourceArg")

val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val query = "SELECT word, word_count FROM word_count WHERE source = ?"

val elements: SCollection[(String, Long)] = sc.jdbcSelect(jdbcOptions, query, _.setString(1, sourceArg)) { r =>
  r.getString(1) -> r.getLong(2)
}

Parallelized table read

When an entire table is to be read, the input table can be sharded based on some column value and each shard read in parallel with jdbcShardedSelect.

JdbcShardedReadOptions requires:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
import com.spotify.scio.jdbc.sharded._

val sc: ScioContext = ???

val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val connOpts = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)

val shardedReadOptions = JdbcShardedReadOptions[(String, Long), Long](
  connectionOptions = connOpts,
  tableName = "tableName",
  shardColumn = "word_count",
  shard = Shard.range[Long],
  rowMapper = r => (r.getString("word"), r.getLong("word_count"))
)
val elements: SCollection[(String, Long)] = sc.jdbcShardedSelect(shardedReadOptions)

Writes

Write to JDBC with saveAsJdbc. It expects a JdbcConnectionOptions to connect to the database. The curried preparedStatementSetter function argument receives an instance of the type-to-be-written and a PreparedStatement and appropriately sets the statement fields.

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver

val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val statement = "INSERT INTO word_count (word, count) values (?, ?)"

val elements: SCollection[(String, Long)] = ???
elements.saveAsJdbc(jdbcOptions, statement) { case ((word, count), statement) =>
  statement.setString(1, word)
  statement.setLong(2, count)
}