BigQuery
Background
NOTE that there are currently two BigQuery dialects, the legacy query syntax and the new SQL 2011 standard. The SQL standard is highly recommended since it generates dry-run schemas consistent with actual result and eliminates a lot of edge cases when working with records in a type-safe manner. To use standard SQL, prefix your query with #standardsql
.
TableRow
BigQuery rows are represented as TableRow
in the BigQuery Java API which is basically a Map<String, Object>
. Fields are accessed by name strings and values must be cast or converted to the desired type, both of which are error prone process.
Type safe BigQuery
The type safe BigQuery API in Scio represents rows as case classes and generates TableSchema
converters automatically at compile time with the following mapping logic:
- Nullable fields are mapped to
Option[T]
s - Repeated fields are mapped to
List[T]
s - Records are mapped to nested case classes
- Timestamps are mapped to Joda Time
Instant
See documentation for BigQueryType
for the complete list of supported types.
Type annotations
There are 5 annotations for type safe code generation.
BigQueryType.fromStorage
This expands a class with output fields from a BigQuery Storage API read. Note that class Row
has no body definition and is expanded by the annotation at compile time based on actual table schema.
Storage API provides fast access to BigQuery-managed storage by using an rpc-based protocol. It is preferred over @BigQueryType.fromTable
and @bigQueryType.fromQuery
. For comparison:
fromTable
exports the entire table to Avro files on GCS and reads from them. This incurs export cost and export quota. It can also be wasteful if only a fraction of the columns/rows are needed.fromQuery
executes the query and saves result as a temporary table before reading it likefromTable
. This incurs both query and export cost plus export quota.fromStorage
accesses the underlying BigQuery storage directly, reading only columns and rows based onselectedFields
androwRestriction
. No query, export cost or quota hit.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromStorage(
"bigquery-public-data:samples.gsod",
selectedFields = List("tornado", "month"),
rowRestriction = "tornado = true"
)
class Row
BigQueryType.fromTable
This expands a class with fields that map to a BigQuery table.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromTable("bigquery-public-data:samples.gsod")
class Row
BigQueryType.fromQuery
This expands a class with output fields from a SELECT query. A dry run is executed at compile time to get output schema and does not incur additional cost.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromQuery("SELECT tornado, month FROM [bigquery-public-data:samples.gsod]")
class Row
The query string may also contain "%s"
s and additional arguments for parameterized query. This could be handy for log type data.
import com.spotify.scio.bigquery.types.BigQueryType
// generate schema at compile time from a specific date
@BigQueryType.fromQuery("SELECT user, url FROM [my-project:logs.log_%s]", "20160101")
class Row
// generate new query strings at runtime
val newQuery = Row.query(args(0))
There’s also a $LATEST
placeholder for table partitions. The latest common partition for all tables with the placeholder will be used.
import com.spotify.scio.bigquery.types.BigQueryType
// generate schema at compile time from the latest date available in both my-project:log1.log_* and my-project:log2.log_*
@BigQueryType.fromQuery(
"SELECT user, url, action FROM [my-project:log1.log_%s] JOIN [my-project:log2.log_%s] USING user",
"$LATEST", "$LATEST")
class Row
// generate new query strings at runtime
val newQuery = Row.query(args(0), args(0))
BigQueryType.fromSchema
This annotation gets schema from a string parameter and is useful in tests.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromSchema(
"""
|{
| "fields": [
| {"mode": "REQUIRED", "name": "f1", "type": "INTEGER"},
| {"mode": "REQUIRED", "name": "f2", "type": "FLOAT"},
| {"mode": "REQUIRED", "name": "f3", "type": "BOOLEAN"},
| {"mode": "REQUIRED", "name": "f4", "type": "STRING"},
| {"mode": "REQUIRED", "name": "f5", "type": "TIMESTAMP"}
| ]
|}
""".stripMargin)
class Row
BigQueryType.toTable
This annotation works the other way around. Instead of generating class definition from a BigQuery schema, it generates BigQuery schema from a case class definition.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.toTable
case class Result(user: String, url: String, time: Long)
Fields in the case class and the class itself can also be annotated with @description
which propagates to BigQuery schema.
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.bigquery.description
@BigQueryType.toTable
@description("A list of users mapped to the urls they visited")
case class Result(user: String,
url: String,
@description("Milliseconds since Unix epoch") time: Long)
Annotation parameters
Note that due to the nature of Scala macros, only string literals and multi-line strings with optional .stripMargin
are allowed as parameters to BigQueryType.fromTable
, BigQueryType.fromQuery
and BigQueryType.fromSchema
.
These are OK:
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromTable("project-id:dataset-id.table-id")
class Row1
@BigQueryType.fromQuery(
"""
|SELECT user, url
|FROM [project-id:dataset-id.table-id]
""".stripMargin)
class Row2
And these are not:
val args: Array[String] = Array("", "*", "[project-id:dataset-id.table-id]")
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromQuery("SELECT " + args(1) + " FROM [" + args(2) + "]")
class Row1
val sql = "SELECT " + args(1) + " FROM [" + args(2) + "]"
@BigQueryType.fromQuery(sql)
class Row2
Companion objects
Classes annotated with the type safe BigQuery API have a few more convenience methods.
schema: TableSchema
- BigQuery schemafromTableRow: (TableRow => T)
-TableRow
to case class convertertoTableRow: (T => TableRow)
- case class toTableRow
convertertoPrettyString(indent: Int = 0)
- pretty string representation of the schema
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromTable("bigquery-public-data:samples.gsod")
class Row
Row.toPrettyString(2)
In addition, BigQueryType.fromTable
and BigQueryTable.fromQuery
generate table: String
and query: String
respectively that refers to parameters in the original annotation.
import com.spotify.scio.bigquery.types.BigQueryTypeUser defined companion objects may interfere with macro code generation so for now do not provide one to a case class annotated with @BigQueryType.toTable
, i.e. object Row
.
Using type safe BigQuery
Type safe BigQuery with Scio
To enable type safe BigQuery for ScioContext
:
import com.spotify.scio._
import com.spotify.scio.bigquery._
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromQuery("SELECT tornado, month FROM [bigquery-public-data:samples.gsod]")
class Row
@BigQueryType.toTable
case class Result(month: Long, tornado_count: Long)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.typedBigQuery[Row]() // query string from Row.query
.flatMap(r => if (r.tornado.getOrElse(false)) Seq(r.month) else Nil)
.countByValue
.map(kv => Result(kv._1, kv._2))
.saveAsTypedBigQueryTable(Table.Spec(args("output"))) // schema from Row.schema
sc.run()
()
}
Type safe BigQueryClient
Annotated classes can be used with the BigQueryClient
directly too.
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.bigquery.client.BigQuery
@BigQueryType.fromQuery("SELECT tornado, month FROM [bigquery-public-data:samples.gsod]")
class Row
def bq = BigQuery.defaultInstance()
def rows = bq.getTypedRows[Row]()
def result = bq.writeTypedRows("project-id:dataset-id.table-id", rows.toList)
Using type safe BigQuery directly with Beam’s IO library
If there are any BigQuery I/O operations supported in the Apache Beam client but not exposed in Scio, you may choose to use the Beam transform directly using Scio’s .saveAsCustomOutput()
option:
import com.spotify.scio.values.SCollection
import com.spotify.scio.bigquery.types.BigQueryType
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
@BigQueryType.fromQuery("SELECT tornado, month FROM [bigquery-public-data:samples.gsod]")
class Foo
def bigQueryType = BigQueryType[Foo]
def tableRows: SCollection[Foo] = ???
def result =
tableRows
.map(bigQueryType.toTableRow)
.saveAsCustomOutput(
"custom bigquery IO",
BigQueryIO
.writeTableRows()
.to("my-project:my-dataset.my-table")
.withSchema(bigQueryType.schema)
.withCreateDisposition(???)
.withWriteDisposition(???)
.withFailedInsertRetryPolicy(???)
)
BigQueryType and IntelliJ IDEA
See the FAQ for making IntelliJ happy with type safe BigQuery.
Custom types and validation
See OverrideTypeProvider for details about the custom types and validation mechanism.
BigQuery authentication
BigQuery authentication works a bit differently than other IO types and can be hard to reason about. File-based IOs, for example, are read from directly on each remote worker node. In contrast, for BigQuery reads, Scio will actually launch a Bigquery export job from the main class process before submitting a Dataflow job request. This export job extracts the requested BQ data to a temporary GCS location, from which the job workers can read directly from. Thus, your launcher code must be credentialed with the required permissions to export data.
This credential will be picked up from the values of bigquery.project
and bigquery.secret
, if set. If they are not, Scio will attempt to find an active Application Default Credential and set the billing project to the value from DefaultProjectFactory
. As of Scio 0.11.6, you can set the SBT option bigquery.debug_auth=true
, which enables Scio to log the active credential used in BigQuery queries that return a 403 FORBIDDEN status.
A service account impersonation is available using SBT option bigquery.act_as=service-account@my-project.iam.gserviceaccount.com
. It requires roles/iam.serviceAccountTokenCreator
to be granted to a source account.
Note that BigQuery Storage APIs don’t require an export job as they can read from BigQuery directly.
BigQuery configurations in SBT
Scio offers several BigQuery options that can be configured as SBT options - either in a root-level .sbtopts
file or in your SBT process as sbt -D{$OPT_KEY}=${OPT_VALUE} ...
:
Option | Description |
---|---|
bigquery.project |
Specifies the billing project to use for queries. Defaults to the default project associated with the active GCP configuration (see DefaultProjectFactory ). |
bigquery.secret |
Specifies a file path containing a BigQuery credential. Defaults to the Application Default Credential. |
bigquery.connect_timeout |
Timeout in milliseconds to establish a connection. Default is 20000 (20 seconds). 0 for an infinite timeout. |
bigquery.read_timeout |
Timeout in milliseconds to read data from an established connection. Default is 20000 (20 seconds). 0 for an infinite timeout. |
bigquery.priority |
Determines whether queries are executed in “BATCH” or “INTERACTIVE” mode. Default: BATCH. |
bigquery.debug_auth |
Enables logging active BigQuery user information on auth errors. Default: false. |
bigquery.types.debug |
Enables verbose logging of macro generation steps. Default: false. |
bigquery.cache.enabled |
Enables scio bigquery caching. Default: true. |
generated.class.cache.directory |
BigQuery generated class cache directory. Defaults to a directory in java.io.tmpdir . |
bigquery.cache.directory |
BigQuery local schema cache directory. Defaults to a directory in java.io.tmpdir . |
bigquery.plugin.disable.dump |
Disable macro class dump. Default: false. |
bigquery.act_as |
A target SA principal to impersonate current auth. Optional. |
bigquery.act_as_lifetime |
A duration in seconds of a target SA temporary credentials lifetime. Default: 3600. |