Scio, Beam and Dataflow

Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts. Also see this comparison between Scio, Scalding and Spark APIs.

Scio aims to be a thin wrapper on top of Beam while offering idiomatic Scala style API.

Basics

ScioContext, PipelineOptions, Args and ScioResult

  • Beam/Dataflow uses PipelineOptions and its subclasses to parse command line arguments. Users have to extend the interface for their application level arguments.
  • Scalding uses Args to parse application arguments in a more generic and boilerplate free style.
  • ScioContext has a parseArguments method that takes an Array[String] of command line arguments, parses Beam/Dataflow specific ones into a PipelineOptions, and application specific ones into an Args, and returns the (PipelineOptions, Args).
  • ContextAndArgs is a short cut to create a (ScioContext, Args).
  • ScioResult can be used to access accumulator values and job state.

IO

  • Most IO Read transforms are implemented as methods on ScioContext, e.g. avroFile, textFile, bigQueryTable.
  • Most IO Write transforms are implemented as methods on SCollection, e.g. saveAsAvroFile, saveAsTextFile, saveAsBigQueryTable.
  • These IO operations also detects when the ScioContext is running in a JobTest and manages test IO in memory.
  • Write options also return a ClosedTap. Once the job completes you can open the Tap. Tap abstracts away the logic of reading the dataset directly as an Iterator[T] or re-opening it in another ScioContext. The Future is complete once the job finishes. This can be used to do light weight pipeline orchestration e.g. WordCountOrchestration.scala.

ByKey operations

  • Beam/Dataflow ByKey transforms require PCollection[KV[K, V]] inputs while Scio uses SCollection[(K, V)]
  • Hence every ByKey transform in PairSCollectionFunctions converts Scala (K, V) to KV[K, V] before and vice versa afterwards. However these are lightweight wrappers and the JVM should be able to optimize them.
  • PairSCollectionFunctions also converts java.lang.Iterable[V] and java.util.List[V] to scala.Iterable[V] in some cases.

Coders

  • Beam/Dataflow uses Coder for (de)serializing elements in a PCollection during shuffle. There are built-in coders for Java primitive types, collections, and common types in GCP like Avro, ProtoBuf, BigQuery TableRow, Datastore Entity.
  • PCollection uses TypeToken from Guava reflection and to workaround Java type erasure and retrieve type information of elements. This may not always work but there is a PCollection#setCoder method to override.
  • Twitter’s chill library uses kryo to (de)serialize data. Chill includes serializers for common Scala types and cal also automatically derive serializers for arbitrary objects. Scio falls back to KryoAtomicCoder when a built-in one isn’t available.
  • A coder may be non-deterministic if Coder#verifyDeterministic throws an exception. Any data type with such a coder cannot be used as a key in ByKey operations. However KryoAtomicCoder assumes all types are deterministic for simplicity so it’s up to the user’s discretion to not avoid non-deterministic types e.g. tuples or case classes with doubles as keys.
  • Avro GenericRecord requires a schema during deserialization (which is available as GenericRecord#getSchema for serialization) and AvroCoder requires that too during initialization. This is not possible in KryoAtomicCoder, i.e. when nesting GenericRecord inside a Scala type. Instead KryoAtomicCoder serializes the schema before every record so that they can roundtrip safely. This is not optimal but the only way without requiring user to handcraft a custom coder.