Scio v0.8.0

TL;DR

New features

BeamSQL

Beam SQL integration is added in this release! This integration comes in many flavors, from fluent api to string interpolation with both offering the possibility to typecheck the provided query at compile time.

A simple use case of this api is reflected in the example below. This example uses the fluent api to query the SCollection[User] and extract username and age. query return’s Row which is Beam’s underlying type that contains the values and the Schema of the extracted data.

import com.spotify.scio.sql._
import com.spotify.scio.values._
import org.apache.beam.sdk.values.Row

case class User(username: String, email: String, age: Int)

def users: SCollection[User] = ???

def result: SCollection[Row] = users.query("select username, age from SCOLLECTION")

In the following example we go a little bit further. Using queryAs we can express the expected return type (String, Int) instead of Row. Query typecheck only happens at runtime

This will fail at runtime if the expected return type doesn’t match the inferred schema for the given query.

import com.spotify.scio.sql._
import com.spotify.scio.values._

case class User(username: String, email: String, age: Int)

def users: SCollection[User] = ???

def result: SCollection[(String, Int)] = users.queryAs("select username, age from SCOLLECTION")

String interpolation is another way to express SQL queries. As we can see from the following example it behaves exactly as any other string interpolation with the added expression of the expected return type. As in the previous example the return type is not typechecked at compile time with the inferred query schema. Any mismatch will result in runtime exception.

import com.spotify.scio.sql._
import com.spotify.scio.values._

case class User(username: String, email: String, age: Int)

def users: SCollection[User] = ???

def result: SCollection[(String, Int)] = sql"select username, age from $users".as[(String, Int)]

Typecheck at compile time is provided by tsql. Here’s the same example but with this new method:

import com.spotify.scio.sql._
import com.spotify.scio.values._

case class User(username: String, email: String, age: Int)

def users: SCollection[User] = ???

def result: SCollection[(String, Int)] = tsql"select username, age from $users".as[(String, Int)]

Handling SQL query Errors

SQL query errors can happen! They might have syntax errors, wrong fields and even wrong expected types. To help you out, we have some gorgeous error messages for you!

Using the users collection from previous example, here are some error messages you might encounter.

Selecting wrong field from users:

- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, foo from $users".as[(String, Int)]
SqlValidatorException: Column 'foo' not found in any table

Query:
select username, foo from SCOLLECTION


schema of SCOLLECTION:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username                                 │ STRING               │ NO       │
│ email                                    │ STRING               │ NO       │
│ age                                      │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Query result schema (inferred) is unknown.
Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ _1                                       │ STRING               │ NO       │
│ _2                                       │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Providing a SQL query with syntax error:

- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, age fom $users".as[(String, Int)]
ParseException: Encountered "users" at line 1, column 27.
Was expecting one of:
    <EOF>
    "ORDER" ...
    "LIMIT" ...
    "OFFSET" ...
    "FETCH" ...
    "FROM" ...
    "," ...
    "UNION" ...
    "INTERSECT" ...
    "EXCEPT" ...
    "MINUS" ...


Query:
select username, age fom  users


schema of users:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username                                 │ STRING               │ NO       │
│ email                                    │ STRING               │ NO       │
│ age                                      │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Query result schema (inferred) is unknown.
Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ _1                                       │ STRING               │ NO       │
│ _2                                       │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Return type different from the inferred one:

- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, age fom $users".as[String]
Inferred schema for query is not compatible with the expected schema.

Query:
select username, age from  users


schema of users:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username                                 │ STRING               │ NO       │
│ email                                    │ STRING               │ NO       │
│ age                                      │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Query result schema (inferred):
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username                                 │ STRING               │ NO       │
│ age                                      │ INT32                │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME                                     │ TYPE                 │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ value                                    │ STRING               │ NO       │
└──────────────────────────────────────────┴──────────────────────┴──────────┘

BigQuery Storage API

BigQuery Storage API provides fast access to BigQuery managed storage by using an rpc-based protocol.

If you already use BigQuery, the BigQuery Storage api that we provide will look very familiar as it provides the standard and the type safe api. Switching to this new strategy should be very straightforward.

Using the type safe api is almost the same as the previous provided strategies. We just need to use @BigQueryType.fromStorage. The example below retrieves all columns from a given table.

import com.spotify.scio.bigquery._

@BigQueryType.fromStorage("data-integration-test:storage.nested")
class Example

However if you don’t want to pull everything, you can always specify which fields you want and even set some filtering. Note that the field names in selectedFields must appear in the same order as the columns appear in the table.

import com.spotify.scio.bigquery._

@BigQueryType.fromStorage(
    "data-integration-test:storage.%s",
    List("nested"),
    selectedFields = List("required", "optional.int"),
    rowRestriction = "required.int < 5"
)
class Example

Using the above annotated classes can be done through the following methods.

import com.spotify.scio._
import com.spotify.scio.bigquery._
import com.spotify.scio.values._

def sc: ScioContext = ???

def below: SCollection[Example] = sc.typedBigQuery[Example]()

// or

def above: SCollection[Example] = sc.typedBigQueryStorage[Example](rowRestriction = "required.int > 5")

When not using the type safe api you can always read as:

import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.bigquery._

def sc: ScioContext = ???

def result: SCollection[TableRow] = sc.bigQueryStorage(
    Table.Spec("apache-beam-testing:samples.weather_stations"),
    selectedFields = List("tornado", "month"),
    rowRestriction = "tornado = true"
  )

Generic case class type conversion

With the introduction of automatic schema derivation for data types it becomes really easy to convert between “compatible” generic case classes.

import com.spotify.scio.values._
import com.spotify.scio.schemas._

case class FromExample(i: Int, s: String)

case class ToExample(s: String)

def examples: SCollection[FromExample] = ???

To convert FromExample => ToExample we can use two methods unsafe and safe. The main difference between them is when the conversion compatibility check happens. With unsafe it happens at runtime while with safe it’s at compile time.

def unsafe: SCollection[ToExample] = examples.to[ToExample](To.unsafe)

def safe: SCollection[ToExample] = examples.to[ToExample](To.safe)

Deprecations and Breaking changes

scala.concurrent.Future removed from ScioIOs

The removal of Future led to some semantic and behaviour changes.

ScioIOs no longer return Future

trait ScioIO[T] {
  ....
- protected def write(data: SCollection[T], params: WriteP): Future[Tap[tapT.T]]
+ protected def write(data: SCollection[T], params: WriteP): Tap[tapT.T]
  ...
}

SCollection#write returns ClosedTap[T]

sealed trait SCollection[T] extends PCollectionWrapper[T] {
    ...
- def write(io: ScioIO[T])(params: io.WriteP)(implicit coder: Coder[T]): Future[Tap[io.tapT.T]]
+ def write(io: ScioIO[T])(params: io.WriteP)(implicit coder: Coder[T]): ClosedTap[io.tapT.T]
    ...
}

ClosedTap[T] encapsulates the IO Tap[T] and it’s only possible to read from it once the pipeline execution is done. This is demonstrated in the following example:

import com.spotify.scio._
import com.spotify.scio.io._

def sc: ScioContext = ???
def closedTap: ClosedTap[String] =
     sc
      .parallelize(1 to 100)
      .sum
      .map(_.toString)
      .saveAsTextFile("...")

def scioResult: ScioResult = sc.run().waitUntilDone()

// open tap for read
def openedTap: Tap[String] = scioResult.tap(closedTap)

ScioContext

ScioContext#close changed it’s return type to ScioExecutionContext.

- def close(): ScioResult
+ def close(): ScioExecutionContext

ScioContext#close is also being deprecated in favor of ScioContext#run. With this change, --blocking flag is also deprecated.

To achieve the same behaviour with the new ScioContext#run you can do the following:

import com.spotify.scio._
import scala.concurrent.duration._

def scioResult(sc: ScioContext): ScioResult = sc.run().waitUntilDone(Duration.Inf)

Remove tensorflow methods related to schema inference

In scio 0.7.0 scio-tensorflow saw some of its operations being deprecated. They are no longer available in this version and we recommend users to use TensorFlow Data Validation instead.

Removed operations:

  • saveExampleMetadata
  • saveAsTfExampleFileWithSchema

BigQuery

If you are using BigQuery you will see some @deprecated warnings:

// method bigQueryTable in class ScioContextOps is deprecated (since Scio 0.8):
// this method will be removed; use bigQueryTable(Table.Spec(table)) instead

All BigQuery io operations now support new data types Table to reference a specific table and Query to a query. Every method that uses tableSpec: String, tableReference: TableReference, or query: String has been deprecated.

- def bigQueryTable(tableSpec: String): SCollection[TableRow]
- def bigQueryTable(tableReference: TableReference): SCollection[TableRow]
- def bigQuerySelect(query: String): SCollection[TableRow]

+ def bigQueryTable(table: Table): SCollection[TableRow]
+ def bigQuerySelect(query: Query): SCollection[TableRow]

The new Table type can be created according to the following examples:

def tableSpecString: String = ???

def table: Table = Table.Spec(tableSpecString)

or:

def tableReference: TableReference = ???

def table: Table = Table.Ref(tableReference)

The advantage of this over the previous usage of String or TableReference is that now we are able to safely disambiguate between table spec and table reference.

Async DoFns

Async DoFns were refactored.

AsyncLookupDoFn was renamed to BaseAsyncLookupDoFn and we now have better support for Guava, Java 8 and scala Future lookup DoFn’s through the following implementations GuavaAsyncLookupDoFn, JavaAsyncLookupDoFn and ScalaAsyncLookupDoFn.

Remove support for lisp-case CLI arguments

In order to be consistent with Beam’s way of passing arguments into the application and construct PipelineOptions, we decided to drop support for lisp-case arguments.

What this means is that if you were passing arguments like --foo-bar now you need to pass it as --fooBar.