Usage:

 

sbt "runMain com.spotify.scio.examples.extra.DistCacheExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/wikipedia_edits/wiki_data-*.json --output=gs://[BUCKET]/[PATH]/dist_cache_example"

package com.spotify.scio.examples.extra import com.spotify.scio.bigquery._ import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import org.joda.time.Instant object DistCacheExample { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs)  

Declare a distributed cache with two arguments

val dc = sc.distCache(args.getOrElse("months", ExampleData.MONTHS)) { f => 

Load the file into memory as a Map[Int, String]

scala.io.Source .fromFile(f) .getLines() .map { s => val t = s.split(" ") (t(0).toInt, t(1)) } .toMap } sc.tableRowJsonFile(args.getOrElse("input", ExampleData.EXPORTED_WIKI_TABLE)) .map(row => new Instant(row.getLong("timestamp") * 1000L).toDateTime.getMonthOfYear) .countByValue 

Access distributed cache inside a lambda function

.map(kv => dc().getOrElse(kv._1, "unknown") + " " + kv._2) .saveAsTextFile(args("output")) sc.run() () } }