Scio v0.7.0

Scio 0.7.0 comes with major improvements over previous versions. The overall goal is to make Scio safer, faster, more consistent and easier to extend by leveraging Scala’s type system more and refactoring its internals.

The new milestone has been profiled more than ever and will improve the performances of your pipeline. In some cases, the improvement can be very significant.

Scio 0.7.0 also includes, like every release, a number of bugfixes.

What’s new ?

New IOs api

In this version, we’ve refactored the implementation of IOs. Scio now provides a new class ScioIO that you can extend to support new types of IOs. ScioContext now has a new method called read and SCollection now has a new method write. Both take an instance of a class extending ScioIO as a parameter and may read from any source, or write to any target.

All existing IOs (GCS, BigQuery, BigTable, etc.) have been rewritten to use the new IO API.

Read more: ScioIO

New “static” coders

Scio 0.7.0 also ship with a new Coder implementation that statically resolve the correct Coder for a given type at compile time. In previous versions, Scio would infer the correct coder implementation at runtime, which could lead to poor performances and occasionally, exceptions at runtime.

Read more: Coders.

Performances improvements & benchmarks

Thanks to the new static coders implementation, and because of the time we spend on profiling, Scio 0.7.0 should overall be more efficient than previous versions.

Automated migration

Scio 0.7 comes with a set of scalafix rules that will do most of the heavy lifting automatically. Before you go through any manual step, we recommend you start by applying those rules.

Start by adding the scalafix sbt plugin to your project/plugins.sbt file

addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.0")

launch sbt and run scalafixEnable

> scalafixEnable
[info] Set current project to my-amazing-scio-pipeline (in build file:/Users/julient/Documents/my-amazing-scio-pipeline/)

Prepare your tests

Warning

RUN THIS BEFORE UPGRADING SCIO

You’ll need to prepare your tests code for migration. For this to run properly, you code needs to compile.

Run the following command in the sbt shell:

> test:scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_7_0/FixAvroIO.scala
[info] Running scalafix on 78 Scala sources
[success] Total time: 7 s, completed Oct 17, 2018 12:49:31 PM

Once FixAvroIO has been applied, you can go ahead and upgrade Scio to 0.7.x in your build file. After you have set Scio’s version in your build.sbt, make sure to either restart or reload sbt.

You can now run the automated migration rules. At the moment, we support 4 rules:

Name Description
AddMissingImports Add the required imports to access sources / sinks on ScioContext and SCollection
RewriteSysProp Replace sys.call(...) by the new syntax
FixAvroIO Fix uses of AvroIO in tests
BQClientRefactoring Automatically migrate from BigQueryClient to the new BigQuery client

You can see all the rules here.

In your sbt shell, you can now apply the 3 other rules:

> scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_7_0/AddMissingImports.scala
[info] Running scalafix on 173 Scala sources
[success] Total time: 16 s, completed Oct 17, 2018 12:01:31 PM

> scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_7_0/RewriteSysProp.scala
[info] Running scalafix on 173 Scala sources
[success] Total time: 6 s, completed Oct 17, 2018 12:34:00 PM

> scalafix https://raw.githubusercontent.com/spotify/scio/main/scalafix/rules/src/main/scala/fix/v0_7_0/BQClientRefactoring.scala
[info] Running scalafix on 173 Scala sources
[success] Total time: 3 s, completed Oct 17, 2018 12:34:20 PM

At that point you can try to compile your code and fix the few compilation errors left. The next sections of this guide should contain all the information you need to fix everything.

Migration guide

The following section will detail errors you may encounter while migrating from scio 0.6.x to Scio 0.7.x, and help you fix them. If you’ve run the automated migration fixes, you can jump directly to the Add missing context bounds section.

method xxx is not a member of com.spotify.scio.ScioContext

When using read methods from ScioContext the compiler may issue an error of type method xxx is not a member of com.spotify.scio.ScioContext.

IOs have been refactored in Scio 0.7.0. Each IO type now lives in the appropriate project and package. It means 2 things:

You may need to explicitly add a dependency one of Scio’s subprojects in your build file

For example, Scio used to pull dependencies on BigQuery IOs even if your pipeline did not use BigQuery at all. With the new IOs, Scio will limit its dependencies to packages you actually use.

If your pipeline is using BigQuery, you now need to add scio-bigquery as a dependency of your project:

libraryDependencies += "com.spotify" %% "scio-bigquery" % scioVersion

You’ll need to import the appropriate package to gain access to the IO methods.

For example while migrating a job that reads data from an Avro file, you may see the following compiler error:

[error] value avroFile is not a member of com.spotify.scio.ScioContext
[error]     val coll = sc.avroFile[SomeType](uri)
[error]                            ^

All you have to do to fix it is to import IOs from the correct package:

import com.spotify.scio.avro._

AvroIO (or other type of IO) not found

IOs have been moved out of the com.spotify.scio.testing package. To use them in unit tests (or elsewhere), you’ll need to change the import:

  • com.spotify.scio.testing.BigQueryIO -> com.spotify.scio.bigquery.BigQueryIO
  • com.spotify.scio.testing.{AvroIO, ProtobufIO} -> com.spotify.scio.avro.{AvroIO, ProtobufIO}
  • com.spotify.scio.testing.TextIO -> com.spotify.scio.io.TextIO

A complete list of IO packages can be found here.

Additionally, some IOs are now parameterized. For example, AvroIO must now be parameterized with the Avro record type (either GenericRecord or an extension of SpecificRecordBase). In previous versions of Scio, it was possible in some cases to omit that type. For example:

import org.apache.avro.generic.GenericRecord

object MyScioJob {}

val inputAUri = ""
val inputBUri = ""
val output = "output"

object Schemas {
  def record1: GenericRecord = ???
  def record2: GenericRecord = ???
}

implicit def bo: com.spotify.scio.testing.JobTest.BeamOptions = ??? // XXX: why do I need this ?
import com.spotify.scio.io._
import com.spotify.scio.avro._
import com.spotify.scio.testing._

class MyScioJobTest extends PipelineSpec {

  "MyScioJob" should "work" in {
    JobTest[MyScioJob.type]
      .args(s"--inputAUri=${inputAUri}")
      .args(s"--inputBUri=${inputBUri}")
      .input(AvroIO[GenericRecord](inputAUri), Seq(Schemas.record1))
      .input(AvroIO[GenericRecord](inputBUri), Seq(Schemas.record2))
      .output(TextIO(output)){ coll =>
        coll should haveSize (1)
        ()
      }
      .run()
  }

  // more tests
}

Avro type inference issue: “diverging implicit expansion”

If you use AvroIO you may see the compilation of your tests failing with an error looking like

[error] <path>/SomeTest.scala:42:20: diverging implicit expansion for type com.spotify.scio.coders.Coder[K]
[error] starting with macro method gen in trait LowPriorityCoderDerivation
[error]       .input(AvroIO(inputUri), inputs)
[error]

The problem is that line does not explicitly set the type of the IO:

.input(AvroIO(inputUri), inputs)

In Scio <= 0.6.x this works, but in Scio 0.7.x, you’ll need to be explicit about the types. For example in that case:

.input(AvroIO[GenericRecord](inputUri), inputs)

BigQuery client

Client was renamed from BigQueryClient to BigQueryand relocated! Now you need to:

import com.spotify.scio.bigquery.client.BigQuery

The new client offers now methods namespaced according to their resposabilities.

Methods typically fall into 4 categories of operations query, table, export and load. i.e:

-client.extractLocation
+client.query.extracLocation

-client.getQuerySchema(...)
+client.query.schema(...)

-client.getTableRows(...)
+client.table.rows(...)

// the same thing applies for the other formats
-client.loadTableFromCsv(...)
+client.load.csv(...)

// the same thing applies for the other formats
-client.exportTableAsCsv(...)
+client.export.asCsv(...)

Not enough arguments for method (top|topByKey|approxQuantilesByKey): (implicit ord: Ordering[…])

Explicit Ordering functions for SCollection reducers are no longer curried. Methods that used to look like:

.top(10)(Ordering.by(...)

should be changed to:

.top(10, Ordering.by(...))

Add missing context bounds

In the process of upgrading Scio, you may encounter the following error:

Cannot find a Coder instance for type T

If you’ve defined a generic function that uses an SCollection, this function is likely to need a Coder[T]. Scio will require you to provide an implicit Coder[T]. You can read about Scala implicit parameters here

Let’s see a simple example. Say I created the following method doSomething:

def doSomething[T](coll: SCollection[T]): SCollection[T] =
  coll.map { t =>
    // do something that returns a T
    val result: T = ???
    result
  }

If I try to compile this method the compiler will return the following error:

Cannot find a Coder instance for type:

  >> T

  This can happen for a few reasons, but the most common case is that a data
  member somewhere within this type doesn't have a Coder instance in scope. Here are
  some debugging hints:
    - For Option types, ensure that a Coder instance is in scope for the non-Option version.
    - For List and Seq types, ensure that a Coder instance is in scope for a single element.
    - You can check that an instance exists for Coder in the REPL or in your code:
        scala> Coder[Foo]
    And find the missing instance and construct it as needed.

  coll.map { t =>
           ^

What this message says is that calling .map { ... } requires a Coder[T]. You can fix this very easily by adding a new implicit parameter to your method:

import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection

def doSomething[T](coll: SCollection[T])(implicit coder: Coder[T]): SCollection[T] =
  coll.map { t =>
    // do something that returns a T
    val result: T = ???
    result
  }

Alternatively, the same result can be achieved using Scala’s context bound syntax:

import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection

def doSomething[T: Coder](coll: SCollection[T]): SCollection[T] =
  coll.map { t =>
    // do something that returns a T
    val result: T = ???
    result
  }

Replacing Kryo fallbacks with your own coders.

Most of the time, the compiler will be able to find or derive an appropriate Coder automatically. Sometimes, it may not be able to find one automatically. This will typically happen for:

  • Classes defined in Java
  • Scala classes that are not case classes
  • Classes with a private constructor

In those cases, Scio will fallback to using Kryo.

Showing all fallback at compile time

The compiler can show a message each time a fallback is used. To activate that feature, just the the following scalac option: -Xmacro-settings:show-coder-fallback=true.

You can fix this warning in two ways:

  • Implement a proper Coder for this type
  • Make it explicit that the Kryo coder is in fact the one you want to use.

In both cases you want to define a Coder in your own code. The only difference is how you’ll implement it.

Let’s say you are using an SCollection[java.util.Locale]:

import com.spotify.scio.values.SCollection

def doSomething(coll: SCollection[String]): SCollection[java.util.Locale] =
  coll.map { t =>
    // do something that returns a Locale
    val result: java.util.Locale = ???
    result
  }

Using Kryo explicitly

If you want to explicitly use Kryo (which will probably be the case) you can do the following:

import java.util.Locale
import com.spotify.scio.coders.Coder

object Coders {
  implicit val coderLocale: Coder[Locale] = Coder.kryo[Locale]
}

Now all you have to do is make that available at call site:

import com.spotify.scio.values.SCollection
import Coders._

def doSomething(coll: SCollection[String]): SCollection[java.util.Locale] =
  coll.map { t =>
    // do something that returns a Locale
    val result: java.util.Locale = ???
    result
  }

Defining a custom Coder

If you want to implement custom coders, see Scio’s source code for examples.

Warning

Before implementing custom coders, we recommend that you test your pipeline with the default coders. Implementing custom coders can be tricky, so make sure there’s a clear benefit in doing it. If you implement custom Coders, you need to make sure they are Serializable.

WartRemover compatibility.

Automatically derived coders are generated by a macro. Unfortunately, if you use WartRemover in your project, the macro will trigger warnings. There’s not much we can do in the macro to fix the issue right now, so you’ll have to disable a few warts. Here are the warts you’ll need to disable in your project:

- Any
- IsInstanceOf
- Throw

If you use sbt-wartremover, you can disable them in your build like this:

wartremoverErrors in (Compile, compile) := {
  val disableWarts = Set(
    Wart.Any,
    Wart.IsInstanceOf,
    Wart.Throw
  )
  Warts.unsafe.filterNot(disableWarts.contains)
},

wartremoverErrors in (Test, compile) := {
  val disableWarts = Set(
    Wart.Any,
    Wart.IsInstanceOf,
    Wart.Throw
  )
  Warts.unsafe.filterNot(disableWarts.contains)
}