Scio v0.12.0
com.spotify.scio.extra.bigquery
removal
For usages of saveAvroAsBigQuery
, use saveAsBigQueryTable
from com.spotify.scio.bigquery
instead.
- scoll.saveAvroAsBigQuery(tableRef)
+ scoll.saveAsBigQueryTable(table)
Note: you can run the following sbt command to run the relevant scalafix rules to update your BQ API usages:
sbt "scalafixEnable; scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_12_0/FixBqSaveAsTable.scala"
Removal of com.spotify.scio.pubsub
specializations
Specialized methods in com.spotify.scio.pubsub
have been removed in favor of generic .read
and .write
methods.
The PubsubIO.apply
method has been completely removed, use PubsubIO.string
, PubsubIO.avro
, PubsubIO.proto
, PubsubIO.pubsub
or PubsubIO.coder
instead.
Additionally:
PubsubIO.readString
is replaced byPubsubIO.string
PubsubIO.readAvro
is replaced byPubsubIO.avro
PubsubIO.readProto
is replaced byPubsubIO.proto
PubsubIO.readPubsub
is replaced byPubsubIO.pubsub
PubsubIO.readCoder
is replaced byPubsubIO.coder
The pubsubSubscription
and pubsubTopic
methods are replaced by one of the preceding IOs in conjunction with a PubsubIO.ReadParam
. For example:
sc.read(PubsubIO.string(subscription, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Subscription))
sc.read(PubsubIO.string(topic, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Topic))
The pubsubSubscriptionWithAttributes
and pubsubTopicWithAttributes
methods are replaced by PubsubIO.withAttributes
. For example:
sc.read(PubsubIO.withAttributes[String](subscription, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Subscription))
sc.read(PubsubIO.withAttributes[String](topic, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Topic))
The saveAsPubsub
and saveAsPubsubWithAttributes
are similarly replaced in conjunction with a PubsubIO.WriteParam
:
scoll.write(PubsubIO.string(topic, idAttribute, timestampAttribute))(PubsubIO.WriteParam())
scoll.write(PubsubIO.withAttributes[String](topic, idAttribute, timestampAttribute))(PubsubIO.WriteParam())
Note: you can run the following sbt command to run the relevant scalafix rules to automatically update deprecated Pub/Sub API usages:
sbt "scalafixEnable; scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_12_0/FixPubsubSpecializations.scala"
Changed type signatures of SMB methods
There are substantial changes to the java SMB API to accommodate secondary-keyed SMB. For example, AvroSortedBucketIO.Write
changes signature from Write<K, T extends GenericRecord>
to Write<K1, K2, T extends GenericRecord>
. Most users will only interact with the scala API.
Removed Beam-SQL
Removed typedBigQueryTable
methods. Use instead bigQuerySelect
, bigQueryTable
, bigQueryStorage
, typedBigQuery
, or typedBigQueryStorage
.
File IO file naming
File-based IO methods now have a consistent file-naming interface. saveAs*
methods now accept, in addition to path
and suffix
, an optional shardNameTemplate
, tempDirectory
, and filenamePolicySupplier
. shardNameTemplate
and filenamePolicySupplier
are mutually exclusive.
shardNameTemplate
is a string pattern for filenames as accepted by DefaultFilenamePolicy.constructName
.
filenamePolicySupplier
is an instance of FilenamePolicySupplier
, which takes the path
and suffix
as provided to most saveAs*
methods and returns a FilenamePolicy
.
BinaryIO saveAsBinaryFile
saveAsBinaryFile
has been updated to use FilenamePolicySupplier
per above and drops support for FileNaming
.
ParquetIO saveAsDynamicParquetAvroFile โ saveAsParquetAvroFile
saveAsDynamicParquetAvroFile
had an inconsistent interface compared to other saveAsDynamic*
methods.
The pre-0.12 behavior of the filenameFunction
parameter of saveAsDynamicParquetAvroFile
is now supported via the FilenamePolicySupplier
parameter of the non-dynamic saveAsParquetAvroFile
, per above.
A new and more consistent saveAsDynamicParquetAvroFile
is added:
import com.spotify.scio.values.SCollection
import com.spotify.scio.parquet.avro.dynamic._
case class MyClass(s: String, i: Int)
val scoll: SCollection[MyClass] = ???
scoll.saveAsDynamicParquetAvroFile("gs://output/") { m => s"/${m.s}/${m.i}"}
Parquet Reads
Alongside the existing Parquet read implementation (“legacy Parquet”), we’re concurrently offering a new Parquet read implementation that uses Beam’s new SplittableDoFn API. Legacy Parquet is still the default read format, but can enable the new implementation in your Configuration
:
import com.spotify.scio.parquet._
sc.typedParquetFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
sc.parquetAvroFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
sc.parquetExampleFile(path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
Additionally, you can enable it for all Scio jobs in your project by adding it to your project’s src/main/resources/core-site.xml
file:
<configuration>
<property>
<name>scio.parquet.read.useSplittableDoFn</name>
<value>true</value>
<description>Use SplittableDoFn implementation for Parquet reads</description>
</property>
</configuration>
Note that if you’re using DataflowRunner, you’ll get the best performance (in terms of worker scaling and overall resource usage) out of a SplittableDoFn-based read by enabling Dataflow Runner V2. You can enable this in your Dataflow pipeline by supplying the pipeline argument --experiments=use_runner_v2
to your job.
Our plan is to support Legacy Parquet for all Scio 0.12.x versions, but fully deprecate and remove support by 0.13.x.
Async lookup DoFn
All Async lookup DoFn have been reworked and now extends DoFnWithResource
. After upgrade, you’ll get the following error:
class MyLookupDoFn needs to be abstract, since method getResourceType in class DoFnWithResource of type ()com.spotify.scio.transforms.DoFnWithResource.ResourceType is not defined
You must now implement the method and return the appropriate resource type for your client: - ResourceType.PER_INSTANCE
if your client is thread safe (this was the previous behavior) - ResourceType.PER_CLONE
if your client is not thread safe - ResourceType.PER_CLASS
if your client is meant to be shared among all instances