Scio v0.8.0
TL;DR
- BeamSQL integration
- BigQuery Storage api support
- Generic case class conversion
- Remove the usage of Future
- New BigQuery method signatures
- New async DoFns
- Remove tensorflow methods related to schema inference
- Remove support for lisp-case CLI arguments
New features
BeamSQL
Beam SQL integration is added in this release! This integration comes in many flavors, from fluent api to string interpolation with both offering the possibility to typecheck the provided query at compile time.
A simple use case of this api is reflected in the example below. This example uses the fluent api to query the SCollection[User]
and extract username
and age
. query
return’s Row
which is Beam
’s underlying type that contains the values and the Schema
of the extracted data.
import com.spotify.scio.sql._
import com.spotify.scio.values._
import org.apache.beam.sdk.values.Row
case class User(username: String, email: String, age: Int)
def users: SCollection[User] = ???
def result: SCollection[Row] = users.query("select username, age from SCOLLECTION")
In the following example we go a little bit further. Using queryAs
we can express the expected return type (String, Int)
instead of Row
. Query typecheck only happens at runtime
This will fail at runtime if the expected return type doesn’t match the inferred schema for the given query.
import com.spotify.scio.sql._
import com.spotify.scio.values._
case class User(username: String, email: String, age: Int)
def users: SCollection[User] = ???
def result: SCollection[(String, Int)] = users.queryAs("select username, age from SCOLLECTION")
String interpolation is another way to express SQL queries. As we can see from the following example it behaves exactly as any other string interpolation with the added expression of the expected return type. As in the previous example the return type is not typechecked at compile time with the inferred query schema. Any mismatch will result in runtime exception.
import com.spotify.scio.sql._
import com.spotify.scio.values._
case class User(username: String, email: String, age: Int)
def users: SCollection[User] = ???
def result: SCollection[(String, Int)] = sql"select username, age from $users".as[(String, Int)]
Typecheck at compile time is provided by tsql
. Here’s the same example but with this new method:
import com.spotify.scio.sql._
import com.spotify.scio.values._
case class User(username: String, email: String, age: Int)
def users: SCollection[User] = ???
def result: SCollection[(String, Int)] = tsql"select username, age from $users".as[(String, Int)]
Handling SQL query Errors
SQL query errors can happen! They might have syntax errors, wrong fields and even wrong expected types. To help you out, we have some gorgeous error messages for you!
Using the users collection from previous example, here are some error messages you might encounter.
Selecting wrong field from users:
- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, foo from $users".as[(String, Int)]
SqlValidatorException: Column 'foo' not found in any table
Query:
select username, foo from SCOLLECTION
schema of SCOLLECTION:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username │ STRING │ NO │
│ email │ STRING │ NO │
│ age │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Query result schema (inferred) is unknown.
Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ _1 │ STRING │ NO │
│ _2 │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Providing a SQL query with syntax error:
- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, age fom $users".as[(String, Int)]
ParseException: Encountered "users" at line 1, column 27.
Was expecting one of:
<EOF>
"ORDER" ...
"LIMIT" ...
"OFFSET" ...
"FETCH" ...
"FROM" ...
"," ...
"UNION" ...
"INTERSECT" ...
"EXCEPT" ...
"MINUS" ...
Query:
select username, age fom users
schema of users:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username │ STRING │ NO │
│ email │ STRING │ NO │
│ age │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Query result schema (inferred) is unknown.
Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ _1 │ STRING │ NO │
│ _2 │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Return type different from the inferred one:
- tsql"select username, age from $users".as[(String, Int)]
+ tsql"select username, age fom $users".as[String]
Inferred schema for query is not compatible with the expected schema.
Query:
select username, age from users
schema of users:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username │ STRING │ NO │
│ email │ STRING │ NO │
│ age │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Query result schema (inferred):
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ username │ STRING │ NO │
│ age │ INT32 │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
Expected schema:
┌──────────────────────────────────────────┬──────────────────────┬──────────┐
│ NAME │ TYPE │ NULLABLE │
├──────────────────────────────────────────┼──────────────────────┼──────────┤
│ value │ STRING │ NO │
└──────────────────────────────────────────┴──────────────────────┴──────────┘
BigQuery Storage API
BigQuery Storage API provides fast access to BigQuery managed storage by using an rpc-based protocol.
If you already use BigQuery, the BigQuery Storage api that we provide will look very familiar as it provides the standard and the type safe api. Switching to this new strategy should be very straightforward.
Using the type safe api is almost the same as the previous provided strategies. We just need to use @BigQueryType.fromStorage
. The example below retrieves all columns from a given table.
import com.spotify.scio.bigquery._
@BigQueryType.fromStorage("data-integration-test:storage.nested")
class Example
However if you don’t want to pull everything, you can always specify which fields you want and even set some filtering. Note that the field names in selectedFields
must appear in the same order as the columns appear in the table.
import com.spotify.scio.bigquery._
@BigQueryType.fromStorage(
"data-integration-test:storage.%s",
List("nested"),
selectedFields = List("required", "optional.int"),
rowRestriction = "required.int < 5"
)
class Example
Using the above annotated classes can be done through the following methods.
import com.spotify.scio._
import com.spotify.scio.bigquery._
import com.spotify.scio.values._
def sc: ScioContext = ???
def below: SCollection[Example] = sc.typedBigQuery[Example]()
// or
def above: SCollection[Example] = sc.typedBigQueryStorage[Example](rowRestriction = "required.int > 5")
When not using the type safe api you can always read as:
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.bigquery._
def sc: ScioContext = ???
def result: SCollection[TableRow] = sc.bigQueryStorage(
Table.Spec("apache-beam-testing:samples.weather_stations"),
selectedFields = List("tornado", "month"),
rowRestriction = "tornado = true"
)
Generic case class type conversion
With the introduction of automatic schema derivation for data types it becomes really easy to convert between “compatible” generic case class
es.
import com.spotify.scio.values._
import com.spotify.scio.schemas._
case class FromExample(i: Int, s: String)
case class ToExample(s: String)
def examples: SCollection[FromExample] = ???
To convert FromExample
=>
ToExample
we can use two methods unsafe
and safe
. The main difference between them is when the conversion compatibility check happens. With unsafe
it happens at runtime while with safe
it’s at compile time.
def unsafe: SCollection[ToExample] = examples.to[ToExample](To.unsafe)
def safe: SCollection[ToExample] = examples.to[ToExample](To.safe)
Deprecations and Breaking changes
scala.concurrent.Future removed from ScioIO
s
The removal of Future
led to some semantic and behaviour changes.
ScioIO
s no longer return Future
trait ScioIO[T] {
....
- protected def write(data: SCollection[T], params: WriteP): Future[Tap[tapT.T]]
+ protected def write(data: SCollection[T], params: WriteP): Tap[tapT.T]
...
}
SCollection#write
returns ClosedTap[T]
sealed trait SCollection[T] extends PCollectionWrapper[T] {
...
- def write(io: ScioIO[T])(params: io.WriteP)(implicit coder: Coder[T]): Future[Tap[io.tapT.T]]
+ def write(io: ScioIO[T])(params: io.WriteP)(implicit coder: Coder[T]): ClosedTap[io.tapT.T]
...
}
ClosedTap[T]
encapsulates the IO Tap[T]
and it’s only possible to read from it once the pipeline execution is done. This is demonstrated in the following example:
import com.spotify.scio._
import com.spotify.scio.io._
def sc: ScioContext = ???
def closedTap: ClosedTap[String] =
sc
.parallelize(1 to 100)
.sum
.map(_.toString)
.saveAsTextFile("...")
def scioResult: ScioResult = sc.run().waitUntilDone()
// open tap for read
def openedTap: Tap[String] = scioResult.tap(closedTap)
ScioContext
ScioContext#close
changed it’s return type to ScioExecutionContext
.
- def close(): ScioResult
+ def close(): ScioExecutionContext
ScioContext#close
is also being deprecated in favor of ScioContext#run
. With this change, --blocking
flag is also deprecated.
To achieve the same behaviour with the new ScioContext#run
you can do the following:
import com.spotify.scio._
import scala.concurrent.duration._
def scioResult(sc: ScioContext): ScioResult = sc.run().waitUntilDone(Duration.Inf)
Remove tensorflow methods related to schema inference
In scio 0.7.0 scio-tensorflow
saw some of its operations being deprecated. They are no longer available in this version and we recommend users to use TensorFlow Data Validation instead.
Removed operations:
saveExampleMetadata
saveAsTfExampleFileWithSchema
BigQuery
If you are using BigQuery
you will see some @deprecated
warnings:
// method bigQueryTable in class ScioContextOps is deprecated (since Scio 0.8):
// this method will be removed; use bigQueryTable(Table.Spec(table)) instead
All BigQuery
io operations now support new data types Table
to reference a specific table and Query
to a query. Every method that uses tableSpec: String
, tableReference: TableReference
, or query: String
has been deprecated.
- def bigQueryTable(tableSpec: String): SCollection[TableRow]
- def bigQueryTable(tableReference: TableReference): SCollection[TableRow]
- def bigQuerySelect(query: String): SCollection[TableRow]
+ def bigQueryTable(table: Table): SCollection[TableRow]
+ def bigQuerySelect(query: Query): SCollection[TableRow]
The new Table
type can be created according to the following examples:
def tableSpecString: String = ???
def table: Table = Table.Spec(tableSpecString)
or:
def tableReference: TableReference = ???
def table: Table = Table.Ref(tableReference)
The advantage of this over the previous usage of String
or TableReference
is that now we are able to safely disambiguate between table spec and table reference.
Async DoFn
s
Async DoFn
s were refactored.
AsyncLookupDoFn
was renamed to BaseAsyncLookupDoFn
and we now have better support for Guava
, Java 8
and scala Future
lookup DoFn’s through the following implementations GuavaAsyncLookupDoFn
, JavaAsyncLookupDoFn
and ScalaAsyncLookupDoFn
.
Remove support for lisp-case CLI arguments
In order to be consistent with Beam’s way of passing arguments into the application and construct PipelineOptions
, we decided to drop support for lisp-case
arguments.
What this means is that if you were passing arguments like --foo-bar
now you need to pass it as --fooBar
.