Usage:

 

sbt "runMain com.spotify.scio.examples.extra.WordCountOrchestration --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --output=gs://[BUCKET]/[PATH]/wordcount"

package com.spotify.scio.examples.extra import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import com.spotify.scio.io.Tap import com.spotify.scio.values.SCollection import org.apache.beam.sdk.options.PipelineOptions import scala.concurrent.Future object WordCountOrchestration { def main(cmdlineArgs: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val (opts, args) = ScioContext.parseArguments[PipelineOptions](cmdlineArgs) val output = args("output")  

Submit count job 1

val f1 = Future(count(opts, ExampleData.KING_LEAR))  

Submit count job 2

val f2 = Future(count(opts, ExampleData.OTHELLO)) import scala.concurrent.ExecutionContext.Implicits.global  

extract Tap[T]s from two Future[Tap[T]]s

val f = for { t1 <- f1 t2 <- f2 } yield merge(opts, Seq(t1, t2), output)  

Block process and wait for last future

println("Tap:") f.value.take(10).foreach(println) } def count(opts: PipelineOptions, inputPath: String): Tap[(String, Long)] = { val sc = ScioContext(opts) val f = sc .textFile(inputPath) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .materialize sc.run().waitUntilDone().tap(f) }  

Split out transform for unit testing

def countWords(in: SCollection[String]): SCollection[(String, Long)] = in.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)).countByValue def merge(opts: PipelineOptions, s: Seq[Tap[(String, Long)]], outputPath: String): Tap[String] = { val sc = ScioContext(opts) val f = mergeCounts(s.map(_.open(sc))) .map(kv => kv._1 + " " + kv._2) .saveAsTextFile(outputPath) sc.run().waitUntilDone().tap(f) }  

Split out transform for unit testing

def mergeCounts(ins: Seq[SCollection[(String, Long)]]): SCollection[(String, Long)] = SCollection.unionAll(ins).sumByKey