https://cloud.google.com/dataflow/docs/templates/overview

 

Usage:

 

To upload the template: sbt "runMain com.spotify.scio.examples.extra.TemplateExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --stagingLocation=gs://[BUCKET]/staging --templateLocation=gs://[BUCKET]/TemplateExample"

 

To run the template, e.g. from gcloud: gcloud dataflow jobs run [JOB-NAME] --gcs-location gs://[BUCKET]/TemplateExample \ --parameters inputSub=projects/[PROJECT]/subscriptions/sub,\ outputTopic=projects/[PROJECT]/topics/[TOPIC]

 

To run the job directly: sbt "runMain com.spotify.scio.examples.extra.TemplateExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --inputSub=projects/[PROJECT]/subscriptions/sub --outputTopic=projects/[PROJECT]/topics/[TOPIC]"

package com.spotify.scio.examples.extra import com.spotify.scio._ import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO import org.apache.beam.sdk.options.{ Description, PipelineOptions, PipelineOptionsFactory, StreamingOptions, ValueProvider } import org.apache.beam.sdk.options.Validation.Required object TemplateExample { trait Options extends PipelineOptions with StreamingOptions { @Description("The Cloud Pub/Sub subscription to read from") @Required def getInputSubscription: ValueProvider[String] def setInputSubscription(value: ValueProvider[String]): Unit @Description("The Cloud Pub/Sub topic to write to") @Required def getOutputTopic: ValueProvider[String] def setOutputTopic(value: ValueProvider[String]): Unit } def main(cmdlineArgs: Array[String]): Unit = { PipelineOptionsFactory.register(classOf[Options]) val options = PipelineOptionsFactory .fromArgs(cmdlineArgs: _*) .withValidation .as(classOf[Options]) options.setStreaming(true) run(options) } def run(options: Options): Unit = { val sc = ScioContext(options) val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription) val outputIO = PubsubIO.writeStrings().to(options.getOutputTopic)  

We have to use custom inputs and outputs to work with ValueProviders

sc.customInput("input", inputIO) .saveAsCustomOutput("output", outputIO)  

Execute the pipeline

sc.run() () } }