Scio v0.12.0

com.spotify.scio.extra.bigquery removal

For usages of saveAvroAsBigQuery, use saveAsBigQueryTable from com.spotify.scio.bigquery instead.

- scoll.saveAvroAsBigQuery(tableRef)
+ scoll.saveAsBigQueryTable(table)

Note: you can run the following sbt command to run the relevant scalafix rules to update your BQ API usages:

sbt "scalafixEnable; scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_12_0/FixBqSaveAsTable.scala"

Removal of com.spotify.scio.pubsub specializations

Specialized methods in com.spotify.scio.pubsub have been removed in favor of generic .read and .write methods.

The PubsubIO.apply method has been completely removed, use PubsubIO.string, PubsubIO.avro, PubsubIO.proto, PubsubIO.pubsub or PubsubIO.coder instead.

Additionally:

  • PubsubIO.readString is replaced by PubsubIO.string
  • PubsubIO.readAvro is replaced by PubsubIO.avro
  • PubsubIO.readProto is replaced by PubsubIO.proto
  • PubsubIO.readPubsub is replaced by PubsubIO.pubsub
  • PubsubIO.readCoder is replaced by PubsubIO.coder

The pubsubSubscription and pubsubTopic methods are replaced by one of the preceding IOs in conjunction with a PubsubIO.ReadParam. For example:

sc.read(PubsubIO.string(subscription, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Subscription))
sc.read(PubsubIO.string(topic, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Topic))

The pubsubSubscriptionWithAttributes and pubsubTopicWithAttributes methods are replaced by PubsubIO.withAttributes. For example:

sc.read(PubsubIO.withAttributes[String](subscription, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Subscription))
sc.read(PubsubIO.withAttributes[String](topic, idAttribute, timestampAttribute))(PubsubIO.ReadParam(PubsubIO.Topic))

The saveAsPubsub and saveAsPubsubWithAttributes are similarly replaced in conjunction with a PubsubIO.WriteParam:

scoll.write(PubsubIO.string(topic, idAttribute, timestampAttribute))(PubsubIO.WriteParam())
scoll.write(PubsubIO.withAttributes[String](topic, idAttribute, timestampAttribute))(PubsubIO.WriteParam())

Note: you can run the following sbt command to run the relevant scalafix rules to automatically update deprecated Pub/Sub API usages:

sbt "scalafixEnable; scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_12_0/FixPubsubSpecializations.scala"

Changed type signatures of SMB methods

There are substantial changes to the java SMB API to accommodate secondary-keyed SMB. For example, AvroSortedBucketIO.Write changes signature from Write<K, T extends GenericRecord> to Write<K1, K2, T extends GenericRecord>. Most users will only interact with the scala API.

Removed Beam-SQL

Removed typedBigQueryTable methods. Use instead bigQuerySelect, bigQueryTable, bigQueryStorage, typedBigQuery, or typedBigQueryStorage.

File IO file naming

File-based IO methods now have a consistent file-naming interface. saveAs* methods now accept, in addition to path and suffix, an optional shardNameTemplate, tempDirectory, and filenamePolicySupplier. shardNameTemplate and filenamePolicySupplier are mutually exclusive.

shardNameTemplate is a string pattern for filenames as accepted by DefaultFilenamePolicy.constructName.

filenamePolicySupplier is an instance of FilenamePolicySupplier, which takes the path and suffix as provided to most saveAs* methods and returns a FilenamePolicy.

BinaryIO saveAsBinaryFile

saveAsBinaryFile has been updated to use FilenamePolicySupplier per above and drops support for FileNaming.

ParquetIO saveAsDynamicParquetAvroFile โ†’ saveAsParquetAvroFile

saveAsDynamicParquetAvroFile had an inconsistent interface compared to other saveAsDynamic* methods.

The pre-0.12 behavior of the filenameFunction parameter of saveAsDynamicParquetAvroFile is now supported via the FilenamePolicySupplier parameter of the non-dynamic saveAsParquetAvroFile, per above.

A new and more consistent saveAsDynamicParquetAvroFile is added:

import com.spotify.scio.values.SCollection
import com.spotify.scio.parquet.avro.dynamic._
case class MyClass(s: String, i: Int)
val scoll: SCollection[MyClass] = ???
scoll.saveAsDynamicParquetAvroFile("gs://output/") { m => s"/${m.s}/${m.i}"}

Parquet Reads

Alongside the existing Parquet read implementation (“legacy Parquet”), we’re concurrently offering a new Parquet read implementation that uses Beam’s new SplittableDoFn API. Legacy Parquet is still the default read format, but can enable the new implementation in your Configuration:

import com.spotify.scio.parquet._

sc.typedParquetFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
sc.parquetAvroFile[T](path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))
sc.parquetExampleFile(path, conf = ParquetConfiguration.of("scio.parquet.read.useSplittableDoFn" -> true))

Additionally, you can enable it for all Scio jobs in your project by adding it to your project’s src/main/resources/core-site.xml file:

<configuration>
  <property>
    <name>scio.parquet.read.useSplittableDoFn</name>
    <value>true</value>
    <description>Use SplittableDoFn implementation for Parquet reads</description>
  </property>
</configuration>

Note that if you’re using DataflowRunner, you’ll get the best performance (in terms of worker scaling and overall resource usage) out of a SplittableDoFn-based read by enabling Dataflow Runner V2. You can enable this in your Dataflow pipeline by supplying the pipeline argument --experiments=use_runner_v2 to your job.

Our plan is to support Legacy Parquet for all Scio 0.12.x versions, but fully deprecate and remove support by 0.13.x.

Async lookup DoFn

All Async lookup DoFn have been reworked and now extends DoFnWithResource. After upgrade, you’ll get the following error:

class MyLookupDoFn needs to be abstract, since method getResourceType in class DoFnWithResource of type ()com.spotify.scio.transforms.DoFnWithResource.ResourceType is not defined

You must now implement the method and return the appropriate resource type for your client: - ResourceType.PER_INSTANCE if your client is thread safe (this was the previous behavior) - ResourceType.PER_CLONE if your client is not thread safe - ResourceType.PER_CLASS if your client is meant to be shared among all instances