Scio, Beam and Dataflow
Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts. Also see this comparison between Scio, Scalding and Spark APIs.
Scio aims to be a thin wrapper on top of Beam while offering idiomatic Scala style API.
Basics
`ScioContext`
wraps`Pipeline`
`SCollection`
wraps`PCollection`
`ScioResult`
wraps`PipelineResult`
- Most
`PTransform`
are implemented as idiomatic Scala methods onSCollection
e.g.map
,flatMap
,filter
,reduce
. `PairSCollectionFunctions`
and`DoubleSCollectionFunctions`
are specialized version ofSCollection
implemented via the Scala “pimp my library” pattern.- An
SCollection[(K, V)]
is automatically converted to aPairSCollectionFunctions
which provides key-value operations, e.g.groupByKey
,reduceByKey
,cogroup
,join
. - An
SCollection[Double]
is automatically converted to aDoubleSCollectionFunctions
which provides statistical operations, e.g.stddev
,variance
.
ScioContext, PipelineOptions, Args and ScioResult
- Beam/Dataflow uses
`PipelineOptions`
and its subclasses to parse command line arguments. Users have to extend the interface for their application level arguments. - Scalding uses
Args
to parse application arguments in a more generic and boilerplate free style. ScioContext
has aparseArguments
method that takes anArray[String]
of command line arguments, parses Beam/Dataflow specific ones into aPipelineOptions
, and application specific ones into anArgs
, and returns the(PipelineOptions, Args)
.ContextAndArgs
is a short cut to create a(ScioContext, Args)
.ScioResult
can be used to access accumulator values and job state.
IO
- Most
`IO`
Read transforms are implemented as methods onScioContext
, e.g.avroFile
,textFile
,bigQueryTable
. - Most
IO
Write transforms are implemented as methods onSCollection
, e.g.saveAsAvroFile
,saveAsTextFile
,saveAsBigQueryTable
. - These IO operations also detects when the
ScioContext
is running in a`JobTest`
and manages test IO in memory. - Write options also return a
`ClosedTap`
. Once the job completes you can open the`Tap`
.Tap
abstracts away the logic of reading the dataset directly as anIterator[T]
or re-opening it in anotherScioContext
. TheFuture
is complete once the job finishes. This can be used to do light weight pipeline orchestration e.g. WordCountOrchestration.scala.
ByKey operations
- Beam/Dataflow
ByKey
transforms requirePCollection[KV[K, V]]
inputs while Scio usesSCollection[(K, V)]
- Hence every
ByKey
transform inPairSCollectionFunctions
converts Scala(K, V)
toKV[K, V]
before and vice versa afterwards. However these are lightweight wrappers and the JVM should be able to optimize them. PairSCollectionFunctions
also convertsjava.lang.Iterable[V]
andjava.util.List[V]
toscala.Iterable[V]
in some cases.
Coders
- Beam/Dataflow uses
`Coder`
for (de)serializing elements in aPCollection
during shuffle. There are built-in coders for Java primitive types, collections, and common types in GCP like Avro, ProtoBuf, BigQueryTableRow
, DatastoreEntity
. PCollection
usesTypeToken
from Guava reflection and to workaround Java type erasure and retrieve type information of elements. This may not always work but there is aPCollection#setCoder
method to override.- Twitter’s chill library uses kryo to (de)serialize data. Chill includes serializers for common Scala types and cal also automatically derive serializers for arbitrary objects. Scio falls back to
KryoAtomicCoder
when a built-in one isn’t available. - A coder may be non-deterministic if
Coder#verifyDeterministic
throws an exception. Any data type with such a coder cannot be used as a key inByKey
operations. HoweverKryoAtomicCoder
assumes all types are deterministic for simplicity so it’s up to the user’s discretion to not avoid non-deterministic types e.g. tuples or case classes with doubles as keys. - Avro
GenericRecord
requires a schema during deserialization (which is available asGenericRecord#getSchema
for serialization) and`AvroCoder`
requires that too during initialization. This is not possible inKryoAtomicCoder
, i.e. when nestingGenericRecord
inside a Scala type. InsteadKryoAtomicCoder
serializes the schema before every record so that they can roundtrip safely. This is not optimal but the only way without requiring user to handcraft a custom coder.
0.12.3+0-0b1102d7+20230130-1503*