sbt "runMain com.spotify.scio.examples.extra.WordCountScioIO --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=gs://[BUCKET]/[PATH]/wordcount"

package com.spotify.scio.examples.extra import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import import org.slf4j.LoggerFactory object WordCountScioIO

Logger is an object instance, i.e. statically initialized and thus can be used safely in an anonymous function without serialization issue

private val logger = LoggerFactory.getLogger(this.getClass) def main(cmdlineArgs: Array[String]): Unit = { 

Create ScioContext and Args

val (sc, args) = ContextAndArgs(cmdlineArgs)  

Parse input and output path from command line arguments

val input = args.getOrElse("input", ExampleData.KING_LEAR) val output = args("output")  

Create a distribution and two counter metrics. Distribution tracks min, max, sum, min, etc. and Counter tracks count.

val lineDist = ScioMetrics.distribution("lineLength") val sumNonEmpty = ScioMetrics.counter("nonEmptyLines") val sumEmpty = ScioMetrics.counter("emptyLines")  

Create IO classes to read and write

val inputTextIO = TextIO(input) val outputTextIO = TextIO(output)  

Open text files as an SCollection[String] passing io read params .map { w => 

Trim input lines, update distribution metric

val trimmed = w.trim lineDist.update(trimmed.length.toLong) trimmed } .filter { w => 

Filter out empty lines, update counter metrics

val r = w.nonEmpty if (r) else r

Split input lines, filter out empty tokens and expand into a collection of tokens


Count occurrences of each unique String to get (String, Long)


Map (String, Long) tuples into final "word: count" strings

.map { case (word, count) => word + ": " + count

Save result as text files under the output path by passing write params


Execute the pipeline and block until it finishes

val result =  

Retrieve metric values"Max: " + result.distribution(lineDist)"Min: " + result.distribution(lineDist)"Sum non-empty: " + result.counter(sumNonEmpty).committed)"Sum empty: " + result.counter(sumEmpty).committed) } }