Usage:

 

sbt "runMain com.spotify.scio.examples.cookbook.BigQueryTornadoes --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=apache-beam-testing:samples.weather_stations --output=[DATASET].bigquery_tornadoes"

package com.spotify.scio.examples.cookbook import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema} import com.spotify.scio._ import com.spotify.scio.bigquery._ import com.spotify.scio.examples.common.ExampleData import scala.jdk.CollectionConverters._ object BigQueryTornadoes { def main(cmdlineArgs: Array[String]): Unit = { 

Create ScioContext and Args

val (sc, args) = ContextAndArgs(cmdlineArgs)  

Schema for result BigQuery table

val schema = new TableSchema().setFields( List( new TableFieldSchema().setName("month").setType("INTEGER"), new TableFieldSchema().setName("tornado_count").setType("INTEGER") ).asJava )  

Open a BigQuery table as a SCollection[TableRow]

val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) val resultTap = sc .bigQueryTable(table

Extract months with tornadoes

.flatMap(r => if (r.getBoolean("tornado")) Some(r.getLong("month")) else None

Count occurrences of each unique month to get (Long, Long)

.countByValue 

Map (Long, Long) tuples into result TableRows

.map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2)) 

Save result as a BigQuery table

.saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED)  

Access the loaded tables

resultTap .output(BigQueryIO.SuccessfulTableLoads) .map(_.getTableSpec) .debug(prefix = "Loaded table: ")  

Access the failed records

resultTap .output(BigQueryIO.FailedInserts) .count .debug(prefix = "Failed inserts: ")  

Execute the pipeline

sc.run() } }