Scio v0.9.0

TL;DR

Warning

For all the details, refer to the release notes on GitHub.

Bloom Filter

In 0.9.0 we switched from our custom Bloom Filter implementation to Guava Bloom Filter for sparse transforms, e.g. sparseJoin, sparseLookup. As a result, we also switched from Algebird Hash128[K] to Guava Funnel[K] type class for hashing items into the Bloom Filter. Implicit Funnel[K] instances are available through magnolify-guava and need to be imported like this:

import magnolify.guava.auto._

If for error messages like this:

could not find implicit value for parameter hash: com.spotify.scio.hash.BloomFilter.Hash[T]

The switch also adds the following benefits:

Previously Hash128[K] only provides instances for Int, Long, String, Array[Byte], Array[Int] and Array[Long], while magnolify-guava can derive Funnel[K] for most common types including tuples, case classes, etc.

We also added an ApproxFilter abstraction to allow extensible approximate filter implementations. BloomFilter extends ApproxFilter and allows us to create filters & side inputs from Iterable[T] & SCollection[T]. The result filter instances are serializable. For example:

import com.spotify.scio._
import com.spotify.scio.coders.Coder
import com.spotify.scio.hash._
import com.spotify.scio.values._
import magnolify.guava._

val bf: BloomFilter[String] = Seq("a", "b", "c").asApproxFilter(BloomFilter)

val sc = ScioContext()
val data = sc.parallelize(Seq("a", "b", "c"))
val bfs: SCollection[BloomFilter[String]] = data.asApproxFilter(BloomFilter)
val bfsi: SideInput[BloomFilter[String]] = data.asApproxFilterSideInput(BloomFilter)

val bfCoder: Coder[BloomFilter[String]] = BloomFilter.filterCoder

IO’s

BigQuery

In scio 0.8.0 we introduced some deprecations and with this version, we are enforcing them. What this means is that all BigQuery operations should expect a Table type that can be created either from a table reference or spec:

def tableSpecString: String = ???

def table: Table = Table.Spec(tableSpecString)

or

def tableReference: TableReference = ???

def table: Table = Table.Ref(tableReference)

Bellow are some of the affected methods and suggestion on how you can migrate:

- typedBigQuery(table: TableReference, ...)
+ typedBigQuery(table: Table.Ref(tableReference), ...)

- typedBigQuery(tableSpec: String, ...)
+ typedBigQuery(tableSpec: Table.Spec(tableSpec), ...)

- saveAsBigQuery(table: TableReference, ...)
+ saveAsBigQueryTable(table: Table.Ref(tableReference), ...)

- saveAsBigQuery(tableSpec: String, ...)
+ saveAsBigQueryTable(tableSpec: Table.Spec(tableSpec), ...)

- saveAsTypedBigQuery(table: TableReference, ...)
+ saveAsTypedBigQueryTable(table: Table.Ref(tableReference), ...)

- saveAsTypedBigQuery(tableSpec: String, ...)
+ saveAsTypedBigQueryTable(tableSpec: Table.Spec(tableSpec), ...)

Methods with only argument type change:

- bigQuerySelect(query: String, ...)
+ bigQuerySelect(query: Query(sql), ...)

- saveAsTypedBigQuery(table: TableReference, ...)
+ saveAsTypedBigQuery(table: Table.Ref(tableReference), ...)

- saveAsTypedBigQuery(tableSpec: String, ...)
+ saveAsTypedBigQuery(tableSpec: Table.Spec(tableSpec), ...)

BigQuery deprecations

With 0.9.0 we introduced a new method queryRaw to BigQueryType.fromQuery and deprecated the existing one query. This is scheduled for removal in the next release.

Avro

ReflectiveRecordIO was removed in this release and this means that we no longer need to pass a type param when reading GenericRecord making things a little bit cleaner. This unfortunately means that you will need to update your code by removing the type param from avroFile.

val sc: ScioContext = ???

- sc.avroFile[GenericRecord](path, schema)
+ sc.avroFile(path, schema)

Tensorflow

Removed saveAsTfExampleFile in favor of saveAsTfRecordFile as they express better underlying format in each Example’s are being written.

val coll: SCollection[T <: Example] = ???

- coll.saveAsTfExampleFile(...)
+ coll.saveAsTfRecordFile(...)

End-of-Life

scio-cassandra2 and scio-elasticsearch2 reached end-of-life and were removed.

ScioContext

All the deprecated behavior around execution and pipeline result in 0.8.x was removed!

This means that to start your pipeline you need to:

val sc: ScioContext = ???

- val result: ScioResult = sc.close()
+ val execution: ScioExecutionContext = sc.run()

and to get a ScioResult you need to:

val sc: ScioContext = ???

- val result: ScioResult = sc.close()
+ val result: ScioResult = sc.run().waitUntilDone(Duration.Inf)

Scala 2.11 drop

2.11 served us well! The ecosystem is moving on and so are we! From this version forward we will only support 2.12 and 2.13!

Migrating from 2.11 to 2.12 should not imply any code update, it should be as easy as updating your build.sbt:

- scalaVersion := "2.11.12"
+ scalaVersion := "2.12.13"

However, migrating to 2.13 might require some changes, especially around collections! we advise you to look at the Scala migration guide for an in-depth overview of the most important changes.