Usage:

 

sbt "runMain com.spotify.scio.examples.cookbook.JoinExamples --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --output=gs://[BUCKET]/[PATH]/join_examples"

package com.spotify.scio.examples.cookbook import com.spotify.scio.bigquery._ import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData  

Utilities used in all examples

object JoinUtil

Function to extract event information from BigQuery TableRows

def extractEventInfo(row: TableRow): Seq[(String, String)] = { val countryCode = row.getString("ActionGeo_CountryCode") val sqlDate = row.getString("SQLDATE") val actor1Name = row.getString("Actor1Name") val sourceUrl = row.getString("SOURCEURL") val eventInfo = s"Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl" if (countryCode == null || eventInfo == null) Nil else Seq((countryCode, eventInfo)) }  

Function to extract country information from BigQuery TableRows

def extractCountryInfo(row: TableRow): (String, String) = { val countryCode = row.getString("FIPSCC") val countryName = row.getString("HumanName") (countryCode, countryName) }  

Function to format output string

def formatOutput(countryCode: String, countryName: String, eventInfo: String): String = s"Country code: $countryCode, Country name: $countryName, Event info: $eventInfo" }  

Regular shuffle-based join

object JoinExamples { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) import JoinUtil._  

Extract both sides as SCollection[(String, String)]s

val eventsInfo = sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo 
Left outer join to produce `SCollection[(String, (String, Option[String]))]

.leftOuterJoin(countryInfo) .map { t => val (countryCode, (eventInfo, countryNameOpt)) = t val countryName = countryNameOpt.getOrElse("none") formatOutput(countryCode, countryName, eventInfo) } .saveAsTextFile(args("output")) sc.run() () } }  

Join with map side input

object SideInputJoinExamples { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) import JoinUtil._  

Extract both sides as SCollection[(String, String)]s, and then convert right hand side as a SideInput of Map[String, String]

val eventsInfo = sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = sc .bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)) .map(extractCountryInfo) .asMapSideInput eventsInfo 

Replicate right hand side to all workers as a side input

.withSideInputs(countryInfo

Specialized version of map with access to side inputs via SideInputContext

.map { (kv, side) => val (countryCode, eventInfo) = kv 

Retrieve side input value (Map[String, String])

val m = side(countryInfo) val countryName = m.getOrElse(countryCode, "none") formatOutput(countryCode, countryName, eventInfo) } 

End of side input operation, convert back to regular SCollection

.toSCollection .saveAsTextFile(args("output")) sc.run() () } }  

Hash join

object HashJoinExamples { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) import JoinUtil._  

Extract both sides as SCollection[(String, String)]s

val eventsInfo = sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo 

Hash join uses side input under the hood and is a drop-in replacement for regular join

.hashLeftOuterJoin(countryInfo) .map { t => val (countryCode, (eventInfo, countryNameOpt)) = t val countryName = countryNameOpt.getOrElse("none") formatOutput(countryCode, countryName, eventInfo) } .saveAsTextFile(args("output")) sc.run() () } }