sbt "runMain com.spotify.scio.examples.WordCount --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=gs://[BUCKET]/[PATH]/wordcount"

package com.spotify.scio.examples import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import org.slf4j.LoggerFactory object WordCount

Logger is an object instance, i.e. statically initialized and thus can be used safely in an anonymous function without serialization issue

private val logger = LoggerFactory.getLogger(this.getClass) def main(cmdlineArgs: Array[String]): Unit = { 

Create ScioContext and Args

val (sc, args) = ContextAndArgs(cmdlineArgs)  

Parse input and output path from command line arguments

val input = args.getOrElse("input", ExampleData.KING_LEAR) val output = args("output")  

Create a distribution and two counter metrics. Distribution tracks min, max, sum, min, etc. and Counter tracks count.

val lineDist = ScioMetrics.distribution("lineLength") val sumNonEmpty = ScioMetrics.counter("nonEmptyLines") val sumEmpty = ScioMetrics.counter("emptyLines")  

Open text files as an SCollection[String]

sc.textFile(input) .transform("input cleaner") { { w => 

Trim input lines, update distribution metric

val trimmed = w.trim lineDist.update(trimmed.length.toLong) trimmed }.filter { w => 

Filter out empty lines, update counter metrics

val r = w.nonEmpty if (r) else r } } .transform("counter") { 

Split input lines, filter out empty tokens and expand into a collection of tokens


Count occurrences of each unique String to get (String, Long)


Map (String, Long) tuples into strings

.map(t => t._1 + ": " + t._2

Save result as text files under the output path


Execute the pipeline and block until it finishes

val result =  

Retrieve metric values"Max: " + result.distribution(lineDist)"Min: " + result.distribution(lineDist)"Sum non-empty: " + result.counter(sumNonEmpty).committed)"Sum empty: " + result.counter(sumEmpty).committed) } }