JDBC
Scio supports JDBC reads and writes.
Reads
Reads come in two flavors: a query-based variant backed by Beam’s JdbcIO and a “sharded select” that performs a parallelizable bulk read on an entire table.
Read via query
Query-based reads are supported with jdbcSelect. It expects a JdbcConnectionOptions to connect to the database. The statementPreparator argument may be used to set static parameters in the query, usually passed as arguments to the pipeline. The curried rowMapper function argument maps between a java.sql.ResultSet to the result type T.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
val (sc, args): (ScioContext, Args) = ???
val sourceArg: String = args("wordCountSourceArg")
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val query = "SELECT word, word_count FROM word_count WHERE source = ?"
val elements: SCollection[(String, Long)] = sc.jdbcSelect(jdbcOptions, query, _.setString(1, sourceArg)) { r =>
r.getString(1) -> r.getLong(2)
}
Parallelized table read
When an entire table is to be read, the input table can be sharded based on some column value and each shard read in parallel with jdbcShardedSelect.
JdbcShardedReadOptions requires:
- A
rowMapperwith the same function as injdbcSelect - The
tableNameof the table to be read - A
shardColumn, the column on which the read will be sharded. This column must be indexed and should have an index whereshardColumnis not part of a composite index. - A
shard(Shard) implementation for the type ofshardColumn. Provided implementations areInt,Long,BigDecimal,Double,Float,ShardString.HexUpperString,ShardString.HexLowerString,ShardString.UuidUpperString,ShardString.UuidLowerString, andShardString.Base64String.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
import com.spotify.scio.jdbc.sharded._
val sc: ScioContext = ???
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val connOpts = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val shardedReadOptions = JdbcShardedReadOptions[(String, Long), Long](
connectionOptions = connOpts,
tableName = "tableName",
shardColumn = "word_count",
shard = Shard.range[Long],
rowMapper = r => (r.getString("word"), r.getLong("word_count"))
)
val elements: SCollection[(String, Long)] = sc.jdbcShardedSelect(shardedReadOptions)
Writes
Write to JDBC with saveAsJdbc. It expects a JdbcConnectionOptions to connect to the database. The curried preparedStatementSetter function argument receives an instance of the type-to-be-written and a PreparedStatement and appropriately sets the statement fields.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.jdbc._
import java.sql.Driver
val jdbcUrl: String = ???
val driverClass: Class[Driver] = ???
val jdbcOptions = JdbcConnectionOptions("username", Some("password"), jdbcUrl, driverClass)
val statement = "INSERT INTO word_count (word, count) values (?, ?)"
val elements: SCollection[(String, Long)] = ???
elements.saveAsJdbc(jdbcOptions, statement) { case ((word, count), statement) =>
statement.setString(1, word)
statement.setLong(2, count)
}