package com.spotify.scio.examples.extra import com.spotify.scio._  

Tap Output example

Save collections as text files, then open their Taps in a new ScioContext

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.TapOutputExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --output=gs://[OUTPUT] --method=[METHOD]"

object TapOutputExample { def main(cmdlineArgs: Array[String]): Unit = { 

Each ScioContext instance maps to a unique pipeline

 

First job and its associated ScioContext

val (sc1, args) = ContextAndArgs(cmdlineArgs) val closedTap1 = sc1 .parallelize(1 to 10) .sum 

Save data to a temporary location for use later as a ClosedTap[T]

.materialize val closedTap2 = sc1 .parallelize(1 to 100) .sum .map(_.toString

Save data for use later as a ClosedTap[T]

.saveAsTextFile(args("output")) val scioResult = sc1.run().waitUntilDone()  

Get taps from ScioResult which is available after the ScioContext has finished

val t1 = scioResult.tap(closedTap1) val t2 = scioResult.tap(closedTap2)  

Fetch Tap values directly

println(t1.value.mkString(", ")) println(t2.value.mkString(", "))  

Second job and its associated ScioContext

val (sc2, _) = ContextAndArgs(cmdlineArgs

Re-open taps in new ScioContext

(t1.open(sc2) ++ t2.open(sc2).map(_.toInt)).sum 

Execute the pipeline and block until it completes

val result = sc2.run().waitUntilDone() println(result.state) } }