JDBC
Scio supports JDBC reads and writes.
Reads
Reads come in two flavors: a query-based variant backed by Beam’s JdbcIO
and a “sharded select” that performs a parallelizable bulk read on an entire table.
Read via query
Query-based reads are supported with jdbcSelect
. It expects a JdbcConnectionOptions
to connect to the database. The statementPreparator
argument may be used to set static parameters in the query, usually passed as arguments to the pipeline. The curried rowMapper
function argument maps between a java.sql.ResultSet
to the result type T
.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
val (sc, args): (ScioContext, Args) = ???
val sourceArg: String = args("wordCountSourceArg")
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val query = "SELECT word, word_count FROM word_count WHERE source = ?"
val elements: SCollection[(String, Long)] = sc.jdbcSelect(jdbcOptions, query, _.setString(1, sourceArg)) { r =>
r.getString(1) -> r.getLong(2)
}
Parallelized table read
When an entire table is to be read, the input table can be sharded based on some column value and each shard read in parallel with jdbcShardedSelect
.
JdbcShardedReadOptions
requires:
- A
rowMapper
with the same function as injdbcSelect
- The
tableName
of the table to be read - A
shardColumn
, the column on which the read will be sharded. This column must be indexed and should have an index whereshardColumn
is not part of a composite index. - A
shard
(Shard
) implementation for the type ofshardColumn
. Provided implementations areInt
,Long
,BigDecimal
,Double
,Float
,ShardString.HexUpperString
,ShardString.HexLowerString
,ShardString.UuidUpperString
,ShardString.UuidLowerString
, andShardString.Base64String
.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
import com.spotify.scio.jdbc.sharded._
val sc: ScioContext = ???
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val connOpts = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val shardedReadOptions = JdbcShardedReadOptions[(String, Long), Long](
connectionOptions = connOpts,
tableName = "tableName",
shardColumn = "word_count",
shard = Shard.range[Long],
rowMapper = r => (r.getString("word"), r.getLong("word_count"))
)
val elements: SCollection[(String, Long)] = sc.jdbcShardedSelect(shardedReadOptions)
Writes
Write to JDBC with saveAsJdbc
. It expects a JdbcConnectionOptions
to connect to the database. The curried preparedStatementSetter
function argument receives an instance of the type-to-be-written and a PreparedStatement
and appropriately sets the statement fields.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val statement = "INSERT INTO word_count (word, count) values (?, ?)"
val elements: SCollection[(String, Long)] = ???
elements.saveAsJdbc(jdbcOptions, statement) { case ((word, count), statement) =>
statement.setString(1, word)
statement.setLong(2, count)
}