Scio v0.9.0
TL;DR
For all the details, refer to the release notes on GitHub.
Bloom Filter
In 0.9.0 we switched from our custom Bloom Filter implementation to Guava Bloom Filter for sparse
transforms, e.g. sparseJoin
, sparseLookup
. As a result, we also switched from Algebird Hash128[K]
to Guava Funnel[K]
type class for hashing items into the Bloom Filter. Implicit Funnel[K]
instances are available through magnolify-guava
and need to be imported like this:
import magnolify.guava.auto._
If for error messages like this:
could not find implicit value for parameter hash: com.spotify.scio.hash.BloomFilter.Hash[T]
The switch also adds the following benefits:
Previously Hash128[K]
only provides instances for Int
, Long
, String
, Array[Byte]
, Array[Int]
and Array[Long]
, while magnolify-guava
can derive Funnel[K]
for most common types including tuples, case classes, etc.
We also added an ApproxFilter
abstraction to allow extensible approximate filter implementations. BloomFilter
extends ApproxFilter
and allows us to create filters & side inputs from Iterable[T]
& SCollection[T]
. The result filter instances are serializable. For example:
import com.spotify.scio._
import com.spotify.scio.coders.Coder
import com.spotify.scio.hash._
import com.spotify.scio.values._
import magnolify.guava._
val bf: BloomFilter[String] = Seq("a", "b", "c").asApproxFilter(BloomFilter)
val sc = ScioContext()
val data = sc.parallelize(Seq("a", "b", "c"))
val bfs: SCollection[BloomFilter[String]] = data.asApproxFilter(BloomFilter)
val bfsi: SideInput[BloomFilter[String]] = data.asApproxFilterSideInput(BloomFilter)
val bfCoder: Coder[BloomFilter[String]] = BloomFilter.filterCoder
IO’s
BigQuery
In scio
0.8.0
we introduced some deprecations and with this version, we are enforcing them. What this means is that all BigQuery
operations should expect a Table
type that can be created either from a table reference or spec:
def tableSpecString: String = ???
def table: Table = Table.Spec(tableSpecString)
or
def tableReference: TableReference = ???
def table: Table = Table.Ref(tableReference)
Bellow are some of the affected methods and suggestion on how you can migrate:
- typedBigQuery(table: TableReference, ...)
+ typedBigQuery(table: Table.Ref(tableReference), ...)
- typedBigQuery(tableSpec: String, ...)
+ typedBigQuery(tableSpec: Table.Spec(tableSpec), ...)
- saveAsBigQuery(table: TableReference, ...)
+ saveAsBigQueryTable(table: Table.Ref(tableReference), ...)
- saveAsBigQuery(tableSpec: String, ...)
+ saveAsBigQueryTable(tableSpec: Table.Spec(tableSpec), ...)
- saveAsTypedBigQuery(table: TableReference, ...)
+ saveAsTypedBigQueryTable(table: Table.Ref(tableReference), ...)
- saveAsTypedBigQuery(tableSpec: String, ...)
+ saveAsTypedBigQueryTable(tableSpec: Table.Spec(tableSpec), ...)
Methods with only argument type change:
- bigQuerySelect(query: String, ...)
+ bigQuerySelect(query: Query(sql), ...)
- saveAsTypedBigQuery(table: TableReference, ...)
+ saveAsTypedBigQuery(table: Table.Ref(tableReference), ...)
- saveAsTypedBigQuery(tableSpec: String, ...)
+ saveAsTypedBigQuery(tableSpec: Table.Spec(tableSpec), ...)
BigQuery deprecations
With 0.9.0
we introduced a new method queryRaw
to BigQueryType.fromQuery
and deprecated the existing one query
. This is scheduled for removal in the next release.
Avro
ReflectiveRecordIO
was removed in this release and this means that we no longer need to pass a type param when reading GenericRecord
making things a little bit cleaner. This unfortunately means that you will need to update your code by removing the type param from avroFile
.
val sc: ScioContext = ???
- sc.avroFile[GenericRecord](path, schema)
+ sc.avroFile(path, schema)
Tensorflow
Removed saveAsTfExampleFile
in favor of saveAsTfRecordFile
as they express better underlying format in each Example
’s are being written.
val coll: SCollection[T <: Example] = ???
- coll.saveAsTfExampleFile(...)
+ coll.saveAsTfRecordFile(...)
End-of-Life
scio-cassandra2
and scio-elasticsearch2
reached end-of-life and were removed.
ScioContext
All the deprecated behavior around execution and pipeline result in 0.8.x
was removed!
This means that to start your pipeline you need to:
val sc: ScioContext = ???
- val result: ScioResult = sc.close()
+ val execution: ScioExecutionContext = sc.run()
and to get a ScioResult
you need to:
val sc: ScioContext = ???
- val result: ScioResult = sc.close()
+ val result: ScioResult = sc.run().waitUntilDone(Duration.Inf)
Scala 2.11 drop
2.11
served us well! The ecosystem is moving on and so are we! From this version forward we will only support 2.12
and 2.13
!
Migrating from 2.11
to 2.12
should not imply any code update, it should be as easy as updating your build.sbt
:
- scalaVersion := "2.11.12"
+ scalaVersion := "2.12.13"
However, migrating to 2.13
might require some changes, especially around collections! we advise you to look at the Scala
migration guide for an in-depth overview of the most important changes.