v0.12.0 Release Blog
Scio 0.12.0 contains many new features, performance improvements, and a few breaking changes. You can find the technical Migration Guide here with code samples and Scalafix instructions, and the full release notes are here.
New Features
Windowing API for file writes
We’ve improved Scio API support for windowed file writes. Most file IOs now have three new, optional parameters: shardNameTemplate
, filenamePolicySupplier
, and tempDirectory
. Windowed writes can be applied with either a shardNameTemplate
or a filenamePolicySupplier
. shardNameTemplate
allows users to specify a shorthand for their desired output file format, for example:
// SSS and NNN refer to current shard ID and total number of shards, respectively
data.saveAsAvroFile("/some-path", numShards = 2, shardNameTemplate = "-SSS-of-NNN")
Files with this shard template would be written to disk like:
% ls /some-path
part-000-of-002.avro
part-001-of-002.avro
In contrast, filenamePolicySupplier
gives you more fine-grained control by allowing you to specify a full FilenamePolicy
based on the supplied path
and suffix
:
data.saveAsAvroFile(path, schema, filenamePolicySupplier =
(path: String, suffix: String) => new FilenamePolicy {
override def windowedFilename(shardNumber: Int, numShards: Int, window: BoundedWindow, paneInfo: PaneInfo, outputFileHints: FileBasedSink.OutputFileHints): ResourceId = ???
override def unwindowedFilename(shardNumber: Int, numShards: Int, outputFileHints: FileBasedSink.OutputFileHints): ResourceId = ???
}))
KV Batch API
Keyed SCollections have new built-in batching APIs: batchByKey, batchByteSizedByKey, and batchWeightedByKey, which allow you to create Iterable
-backed batches within your SCollection, determined by a target batch size, byte size, or weighting scheme, respectively.
data
.keyBy(_.1)
.batchByKey(batchSize = 100L)
SplittableDoFn-based Parquet reads
We’ve introduced an opt-in new Parquet reads implementation, which implements Beam’s SplittableDoFn API, as an alternative to the default BoundedSource
-backed implementation. We’ve observed greatly improved Dataflow metrics with this new implementation (total VCPU time, total memory time), when run with Dataflow Runner V1 (the default), workers may not scale up as much, resulting in an overall slower wall time. Therefore, we recommend trying out the new Parquet read in conjunction with Dataflow Runner V2.
You can opt in by adding an entry to your Parquet read’s Configuration:
import com.spotify.scio.parquet._
sc.typedParquetFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
You can find more information, and other migration options, here.
GRPC Lookup API
Scio 0.12.0 includes a new artifact, scio-grpc
, that provides a custom AsyncLookupDoFn
implementation specifically for GRPC service lookups. Both unary and server-streaming lookups are supported.
import com.spotify.scio.grpc._
data
.map { case (str1, str2) => ConcatRequest.newBuilder.setStr1(str1).setStr2(str2).build }
.grpcLookup[ConcatResponse, ConcatServiceFutureStub](
() => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(),
ConcatServiceGrpc.newFutureStub,
10 // Max pending requests
)(_.concat)
scio-neo4j module
We have a second new artifact in 0.12.0, scio-neo4j
, that supports Neo4J reads and writes.
import com.spotify.scio.neo4j._
case class Entity(id: String, property: Option[String])
val entities = sc
.neo4jCypher[Entity](
Neo4jOptions(Neo4jConnectionOptions("neo4j://neo4j.com:7687", "username", "password")),
"""MATCH (e:Entity)
|WHERE e.property = 'value'
|RETURN e""".stripMargin
)
entities
.map(someFn)
.saveAsNeo4j(
Neo4jOptions(Neo4jConnectionOptions("neo4j://neo4j.com:7687", "username", "password")),
"""UNWIND $rows AS row
|MERGE (e:Entity {id: row.id})
|ON CREATE SET p.id = row.id, p.property = row.property
|""".stripMargin
)
Secondary sort key for SMB writes and reads
SMB now supports secondary sort keys. We’ve extended most SMB APIs in the style of (k: K)
to also support (k1: K1, k2: K2)
. For example, reads and transforms can now specify a secondary key:
sc
.sortMergeJoin(
classOf[String],
classOf[Int], // Secondary sort key
AvroSortedBucketIO.read(new TupleTag[A]("a"), aSchema).from(aPath),
AvroSortedBucketIO.read(new TupleTag[B]("b"), bClass).from(bPath)
).map { case ((k1, k2), (aData, bData)) =>
// ...
}
Writes, too, have an expanded API:
data
.saveAsSortedBucket(
AvroSortedBucketIO.write[String, Integer, Account](
classOf[String], // Key class primary
"name", // Key field primary
classOf[Integer], // Key class secondary
"age", // Key field secondary
classOf[Account])
)
Magnolify upgrade
Scio 0.12.0 uses Magnolify 0.6.2, which contains a few new features: neo4j support, AvroType
performance improvements, and the capability to annotate Parquet case classes when used in AvroCompat
mode:
import magnolify.parquet.ParquetArray.AvroCompat._
import magnolify.shared.doc
@doc("Record-level doc")
case class SomeCaseClass(@doc("field-level doc 1") s: String, @doc("field-level doc 2") i: Int)
data
.map { case (s, i) => SomeCaseClass(s, i) }
.saveAsTypedParquetFile("some-path")
The annotations will appear in the converted Avro schema in the file’s metadata:
% parquet-tools meta some-path/part-00001-of-00005.parquet
file: file:/some-path/part-00001-of-00005.parquet
creator: parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)
extra: writer.model.name = magnolify
extra: parquet.avro.schema = {"type":"record","name":"SomeCaseClass","namespace":"com.spotify.data.parquet","doc":"Record-level doc","fields":[{"name":"s","type":"string","doc":"field-level doc 1"},{"name":"i","type":"int","doc":"field-level doc 2"}]}
Bug fixes/Performance improvements
- Pipelines are unblocked from running on Dataflow RunnerV2 by fixing incorrect and deprecated API usages
- Coders now have a smaller memory footprint; you can expect savings in your total job graph size
You can see a full list on the release notes page.
Breaking changes
Scio 0.12.0 has a few breaking changes. The most impactful changes include:
- Pubsub read API changes
- scio-extra bigquery removal
- Parquet’s saveAsDynamicParquetAvroFile removed in favor of saveAsParquetAvroFile
A full list of breaking changes can be found on our Migration Guide.