v0.12.0 Release Blog

Scio 0.12.0 contains many new features, performance improvements, and a few breaking changes. You can find the technical Migration Guide here with code samples and Scalafix instructions, and the full release notes are here.

New Features

Windowing API for file writes

We’ve improved Scio API support for windowed file writes. Most file IOs now have three new, optional parameters: shardNameTemplate, filenamePolicySupplier, and tempDirectory. Windowed writes can be applied with either a shardNameTemplate or a filenamePolicySupplier. shardNameTemplate allows users to specify a shorthand for their desired output file format, for example:

// SSS and NNN refer to current shard ID and total number of shards, respectively 
data.saveAsAvroFile("/some-path", numShards = 2, shardNameTemplate = "-SSS-of-NNN")

Files with this shard template would be written to disk like:

% ls /some-path
part-000-of-002.avro
part-001-of-002.avro

In contrast, filenamePolicySupplier gives you more fine-grained control by allowing you to specify a full FilenamePolicy based on the supplied path and suffix:

data.saveAsAvroFile(path, schema, filenamePolicySupplier =
  (path: String, suffix: String) => new FilenamePolicy {
    override def windowedFilename(shardNumber: Int, numShards: Int, window: BoundedWindow, paneInfo: PaneInfo, outputFileHints: FileBasedSink.OutputFileHints): ResourceId = ???
    override def unwindowedFilename(shardNumber: Int, numShards: Int, outputFileHints: FileBasedSink.OutputFileHints): ResourceId = ???
  }))

KV Batch API

Keyed SCollections have new built-in batching APIs: batchByKey, batchByteSizedByKey, and batchWeightedByKey, which allow you to create Iterable-backed batches within your SCollection, determined by a target batch size, byte size, or weighting scheme, respectively.

data
  .keyBy(_.1)
  .batchByKey(batchSize = 100L)

SplittableDoFn-based Parquet reads

We’ve introduced an opt-in new Parquet reads implementation, which implements Beam’s SplittableDoFn API, as an alternative to the default BoundedSource-backed implementation. We’ve observed greatly improved Dataflow metrics with this new implementation (total VCPU time, total memory time), when run with Dataflow Runner V1 (the default), workers may not scale up as much, resulting in an overall slower wall time. Therefore, we recommend trying out the new Parquet read in conjunction with Dataflow Runner V2.

You can opt in by adding an entry to your Parquet read’s Configuration:

import com.spotify.scio.parquet._

sc.typedParquetFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))

You can find more information, and other migration options, here.

GRPC Lookup API

Scio 0.12.0 includes a new artifact, scio-grpc, that provides a custom AsyncLookupDoFn implementation specifically for GRPC service lookups. Both unary and server-streaming lookups are supported.

import com.spotify.scio.grpc._

data
  .map { case (str1, str2) => ConcatRequest.newBuilder.setStr1(str1).setStr2(str2).build)
  .grpcLookup[ConcatResponse, ConcatServiceFutureStub](
      () => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(),
      ConcatServiceGrpc.newFutureStub,
      10 // Max pending requests
    )(_.concat)

scio-neo4j module

We have a second new artifact in 0.12.0, scio-neo4j, that supports Neo4J reads and writes.

import com.spotify.scio.neo4j._

case class Entity(id: String, property: Option[String])

val entities = sc
  .neo4jCypher[Entity](
    Neo4jOptions(Neo4jConnectionOptions("neo4j://neo4j.com:7687", "username", "password")),
    """MATCH (e:Entity)
      |WHERE e.property = 'value'
      |RETURN e""".stripMargin
)

entities
  .map(someFn)
  .saveAsNeo4j(
    Neo4jOptions(Neo4jConnectionOptions("neo4j://neo4j.com:7687", "username", "password")),
    """UNWIND $rows AS row
      |MERGE (e:Entity {id: row.id})
      |ON CREATE SET p.id = row.id, p.property = row.property
      |""".stripMargin
  )

Secondary sort key for SMB writes and reads

SMB now supports secondary sort keys. We’ve extended most SMB APIs in the style of (k: K) to also support (k1: K1, k2: K2). For example, reads and transforms can now specify a secondary key:

sc
  .sortMergeJoin(
    classOf[String],
    classOf[Int], // Secondary sort key
    AvroSortedBucketIO.read(new TupleTag[A]("a"), aSchema).from(aPath),
    AvroSortedBucketIO.read(new TupleTag[B]("b"), bClass).from(bPath)
  ).map { case ((k1, k2), (aData, bData)) =>
    // ...
  }

Writes, too, have an expanded API:

data
  .saveAsSortedBucket(
    AvroSortedBucketIO.write[String, Integer, Account](
      classOf[String], // Key class primary
      "name", // Key field primary
      classOf[Integer], // Key class secondary
      "age", // Key field secondary
      classOf[Account])

Magnolify upgrade

Scio 0.12.0 uses Magnolify 0.6.2, which contains a few new features: neo4j support, AvroType performance improvements, and the the capability to annotate Parquet case classes when used in AvroCompat mode:

import magnolify.parquet.ParquetArray.AvroCompat._
import magnolify.shared.doc

@doc("Record-level doc")
case class SomeCaseClass(@doc("field-level doc 1") s: String, @doc("field-level doc 2") i: Int)

data
  .map { case (s, i) => SomeCaseClass(s, i) }
  .saveAsTypedParquetFile("some-path")

The annotations will appear in the converted Avro schema in the file’s metadata:

% parquet-tools meta some-path/part-00001-of-00005.parquet 
file:        file:/some-path/part-00001-of-00005.parquet 
creator:     parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b) 
extra:       writer.model.name = magnolify 
extra:       parquet.avro.schema = {"type":"record","name":"SomeCaseClass","namespace":"com.spotify.data.parquet","doc":"Record-level doc","fields":[{"name":"s","type":"string","doc":"field-level doc 1"},{"name":"i","type":"int","doc":"field-level doc 2"}]} 

Bug fixes/Performance improvements

  • Pipelines are unblocked from running on Dataflow RunnerV2 by fixing incorrect and deprecated API usages
  • Coders now have a smaller memory footprint; you can expect savings in your total job graph size

You can see a full list on the release notes page.

Breaking changes

Scio 0.12.0 has a few breaking changes. The most impactful changes include:

A full list of breaking changes can be found on our Migration Guide.