Usage:

 

sbt "runMain com.spotify.scio.examples.cookbook.CombinePerKeyExamples --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --output=[DATASET].combine_per_key_examples"

package com.spotify.scio.examples.cookbook import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema} import com.spotify.scio.bigquery._ import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import scala.jdk.CollectionConverters._ import scala.collection.SortedSet object CombinePerKeyExamples { def main(cmdlineArgs: Array[String]): Unit = { 

Create ScioContext and Args

val (sc, args) = ContextAndArgs(cmdlineArgs) val minWordLength = 9  

Schema for result BigQuery table

val schema = new TableSchema().setFields( List( new TableFieldSchema().setName("word").setType("STRING"), new TableFieldSchema().setName("all_plays").setType("STRING") ).asJava )  

Open a BigQuery table as a SCollection[TableRow]

val table = Table.Spec(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) sc.bigQueryTable(table

Extract words and corresponding play names

.flatMap { row => val playName = row.getString("corpus") val word = row.getString("word") if (word.length > minWordLength) Some((word, playName)) else None

Aggregate values (play name) of the same key (word), starting with an initial result (empty set), first accumulate values into the set (_ + _), then combine the intermediate results (_ ++ _). Sort values in a set to make test happy.

.aggregateByKey(SortedSet[String]())(_ + _, _ ++ _) 

Map result sets into strings

.mapValues(_.mkString(",")) 

Map (String, String) tuples into result TableRows

.map(kv => TableRow("word" -> kv._1, "all_plays" -> kv._2)) 

Save result as a BigQuery table

.saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED)  

Execute the pipeline

sc.run() () } }