Usage:

 

sbt "runMain com.spotify.scio.examples.extra.WordCountScioIO --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=gs://[BUCKET]/[PATH]/wordcount"

package com.spotify.scio.examples.extra import com.spotify.scio._ import com.spotify.scio.examples.common.ExampleData import com.spotify.scio.io.TextIO import org.slf4j.LoggerFactory object WordCountScioIO

Logger is an object instance, i.e. statically initialized and thus can be used safely in an anonymous function without serialization issue

private val logger = LoggerFactory.getLogger(this.getClass) def main(cmdlineArgs: Array[String]): Unit = { 

Create ScioContext and Args

val (sc, args) = ContextAndArgs(cmdlineArgs)  

Parse input and output path from command line arguments

val input = args.getOrElse("input", ExampleData.KING_LEAR) val output = args("output")  

Create a distribution and two counter metrics. Distribution tracks min, max, sum, min, etc. and Counter tracks count.

val lineDist = ScioMetrics.distribution("lineLength") val sumNonEmpty = ScioMetrics.counter("nonEmptyLines") val sumEmpty = ScioMetrics.counter("emptyLines")  

Create IO classes to read and write

val inputTextIO = TextIO(input) val outputTextIO = TextIO(output)  

Open text files as an SCollection[String] passing io read params

sc.read(inputTextIO)(TextIO.ReadParam()) .map { w => 

Trim input lines, update distribution metric

val trimmed = w.trim lineDist.update(trimmed.length.toLong) trimmed } .filter { w => 

Filter out empty lines, update counter metrics

val r = w.nonEmpty if (r) sumNonEmpty.inc() else sumEmpty.inc() r

Split input lines, filter out empty tokens and expand into a collection of tokens

.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) 

Count occurrences of each unique String to get (String, Long)

.countByValue 

Map (String, Long) tuples into final "word: count" strings

.map { case (word, count) => word + ": " + count

Save result as text files under the output path by passing write params

.write(outputTextIO)(TextIO.DefaultWriteParam)  

Execute the pipeline and block until it finishes

val result = sc.run().waitUntilDone()  

Retrieve metric values

logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax)) logger.info("Min: " + result.distribution(lineDist).committed.map(_.getMin)) logger.info("Sum non-empty: " + result.counter(sumNonEmpty).committed) logger.info("Sum empty: " + result.counter(sumEmpty).committed) } }