Built-in Functionality

Scio is a thin wrapper on top of Beam offering idiomatic Scala APIs. Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts.

Basics

See dedicated sections on:

Core functionality

A ScioContext represents the pipeline and is the starting point for performing reads and the means by which the pipeline is executed. Execute a pipeline by invoking run and await completion by chaining waitUntilDone:

import com.spotify.scio._
val sc: ScioContext = ???
sc.run().waitUntilDone()

SCollection is the representation of the data in a pipeline at a particular point in the execution graph preceding or following a transform. SCollections have many of the methods you would expect on a standard Scala collection: map, filter, flatten, flatMap, reduce, collect, fold, and take.

Any SCollection of 2-tuples is considered a keyed SCollection and the various joins and *ByKey variants of other methods become available. The first item in the tuple is considered the key and the second item the value. The keyBy method creates a keyed SCollection, where the user-defined function extracts the key from the existing values:

import com.spotify.scio.values.SCollection

val elements: SCollection[String] = ???
val result: SCollection[(String, String)] = elements.keyBy(_.head.toString)

Once keyed, elements with the same key can be grouped so that they can be processed together:

import com.spotify.scio.values.SCollection

val elements: SCollection[(String, String)] = ???
val result: SCollection[(String, Iterable[String])] = elements.groupByKey

Distinct elements can be found with distinct (or the distinctBy and distinctByKey variants):

import com.spotify.scio.values.SCollection

val elements: SCollection[String] = ???
val distinct: SCollection[String] = elements.distinct

Elements can be split into different SCollections with partition, which can be useful for error handling. Note that the number of partitions should be small.

import com.spotify.scio.values.SCollection

val elements: SCollection[Int] = ???
val (lessThanFive, greaterThanFive): (SCollection[Int], SCollection[Int]) = elements.partition(_ > 5)

SCollections of the same type can be combined with a union (or unionAll) operation.

import com.spotify.scio.values.SCollection

val a: SCollection[Int] = ???
val b: SCollection[Int] = ???
val elements: SCollection[Int] = a.union(b)

Elements can be printed to the console for inspection at any point of the graph by using debug:

import com.spotify.scio.values.SCollection

val elements: SCollection[String] = ???
elements.debug(prefix = "myLabel: ")

ContextAndArgs

Scio’s ContextAndArgs provides a convenient way to both parse command-line options and acquire a ScioContext:

import com.spotify.scio._

val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs(cmdlineArgs)

If you need custom pipeline options, subclass Beam’s PipelineOptions and use ContextAndArgs.typed:

import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptions

trait Arguments extends PipelineOptions {
  def getMyArg: String
  def setMyArg(input: String): Unit
}

val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs.typed[Arguments](cmdlineArgs)
val myArg: String = args.getMyArg

Aggregations

Scio provides a suite of built-in aggregations. All *ByKey variants do the same as the normal function, but per-key for keyed SCollections.

Counting

import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus

val elements: SCollection[String] = ???
val sketch = ZetaSketchHllPlusPlus[String]()
val result: SCollection[Long] = elements.countApproxDistinct(sketch)

Statistics

For SCollections containing Double, Scio additionally provides a stats method that computes the count, mean, min, max, variance, standard deviation, sample variance, and sample standard deviation over the SCollection. Convenience methods are available directly on the SCollection if only a single value is required:

import com.spotify.scio.values.SCollection
import com.spotify.scio.util.StatCounter

val elements: SCollection[Double] = ???

val stats: SCollection[StatCounter] = elements.stats
val variance: SCollection[Double] = stats.map { s => s.variance }

val stdev: SCollection[Double] = elements.stdev

Sums & combinations

combine (or combineByKey) combines elements with a set of user-defined functions:

import com.spotify.scio.values.SCollection

case class A(count: Long, total: Long)
object A {
  def apply(i: Int): A = A(1L, i.toLong)
  def mergeValue(a: A, i: Int): A = A(a.count + 1L, a.total + i)
  def mergeCombiners(a: A, b: A) = A(a.count + b.count, a.total + b.total)
}

val elements: SCollection[Int] = ???
elements.combine(A.apply)(A.mergeValue)(A.mergeCombiners)

sum (or sumByKey) sums elements given a Semigroup, while aggregate (or aggregateByKey) aggregates elements either with a set of user-defined functions, via a Aggregator, or via a MonoidAggregator.

Both Semigroup and Monoid instances can be derived with magnolify, assuming the behavior for the primitive types is what you expect.

Note

Note that for String the default Semigroup[String] behavior is to append, which is usually not what you want.

Fully-automatic derivation can be very concise but relies on some implicit magic:

import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import magnolify.cats.auto._

case class A(count: Long, total: Long)

val elements: SCollection[A] = ???

val summed: SCollection[A] = elements.sum
val aggregated: SCollection[A] = elements.aggregate(Aggregator.fromMonoid[A])

Semi-automatic derivation in a companion object may be more intelligible:

case class A(count: Long, total: Long)
object A {
  import magnolify.cats.semiauto._
  import cats._
  implicit val aMonoid: Monoid[A] = MonoidDerivation[A]
}

See also Algebird

Metrics

Scio supports Beam’s Counter Distribution and Gauge.

See MetricsExample.

ScioResult

ScioResult can be used to access metric values, individually or as a group:

import com.spotify.scio._
import org.apache.beam.sdk.metrics.{MetricName, Counter}

val sc: ScioContext = ???
val counter: Counter = ???

val sr: ScioResult = sc.run().waitUntilDone()
val counterValue: metrics.MetricValue[Long] = sr.counter(counter)
val counterMap: Map[MetricName, metrics.MetricValue[Long]] = sr.allCounters

Taps & Materialization

Writes return a ClosedTap, which provides an interface to access the written results or pass them to a subsequent Scio job.

import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection

val sc: ScioContext = ???
val elements: SCollection[String] = ???
val writeTap: ClosedTap[String] = elements.saveAsTextFile("gs://output-path")

val sr: ScioResult = sc.run().waitUntilDone()

val textTap: Tap[String] = sr.tap(writeTap)
val textContexts: Iterator[String] = textTap.value

val sc2: ScioContext = ???
val results: SCollection[String] = textTap.open(sc)

The same mechanism underlies Scio’s materialize method, which will save the contents of an SCollection at the point of the materialize to a temporary location and make them available after the pipeline completes:

import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection

val sc: ScioContext = ???
val elements: SCollection[String] = ???
val materializeTap: ClosedTap[String] = elements.materialize

val sr: ScioResult = sc.run().waitUntilDone()
val textTap: Tap[String] = sr.tap(materializeTap)

See also: WordCountOrchestration example.

Use native Beam functionality

If there is a need to use a Beam IO or transform for which Scio does not have an API, you can easily use the native Beam API for single steps in a pipeline otherwise written in Scio.

customInput supports reading from a Beam source; any transform of type PTransform[PBegin, PCollection[T]]:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.apache.beam.sdk.io.TextIO

val sc: ScioContext = ???
val filePattern: String = ???

val textRead: PTransform[PBegin, PCollection[String]] = TextIO.read().from(filePattern)
val elements: SCollection[String] = sc.customInput("ReadText", textRead)

saveAsCustomOutput supports writing to a Beam sink; any transform of type PTransform[PCollection[T], PDone]:

import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PDone, PCollection}
import org.apache.beam.sdk.io.TextIO

val outputLocation: String = ???
val elements: SCollection[String] = ???
val textWrite: PTransform[PCollection[String], PDone] = TextIO.write().to(outputLocation)
elements.saveAsCustomOutput("WriteText", textWrite)

Finally, applyTransform supports using any Beam transform of type PTransform[PCollection[T], PCollection[U]]:

import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.{PTransform, Sum}
import org.apache.beam.sdk.values.PCollection
import java.lang

val elements: SCollection[Double] = ???
val transform: PTransform[PCollection[lang.Double], PCollection[lang.Double]] = Sum.doublesGlobally
val result: SCollection[lang.Double] = elements
  .map(Double.box)
  .applyTransform(transform)

See also: BeamExample

Windowing

timestampBy allows for changing an element’s timestamp:

import com.spotify.scio.values.SCollection
import org.joda.time.Instant

case class A(timestamp: Instant, value: String)
val elements: SCollection[A] = ???
val timestamped: SCollection[A] = elements.timestampBy(_.timestamp)

The withTimestamp, withWindow, and withPaneInfo functions flatten window metadata into the SCollection:

import com.spotify.scio.values.SCollection
import org.joda.time.Instant

val elements: SCollection[String] = ???
val timestamped: SCollection[(String, Instant)] = elements.withTimestamp

toWindowed converts the SCollection to a WindowedSCollection whose elements are all instances of WindowedValue, which gives full access to the windowing metadata:

import com.spotify.scio.values._
import org.joda.time.Instant

val elements: SCollection[String] = ???
val windowed: WindowedSCollection[String] = elements.toWindowed
windowed.map { v: WindowedValue[String] =>
  v.withTimestamp(Instant.now())
}

Scio provides convenience functions for the common types of windowing (withFixedWindows, withSlidingWindows, withSessionWindows, withGlobalWindow) but also provides full control over the windowing with withWindowFn.

import com.spotify.scio.values.SCollection
import org.joda.time.Duration

val elements: SCollection[String] = ???
val windowedElements: SCollection[String] = elements.withFixedWindows(Duration.standardHours(1))

Batching

In cases where some transform performs better on a group of items, elements can be batched by number of elements with `batch`, by the size of the elements with `batchByteSized`, or by some user-defined weight with `batchWeighted`. There are also keyed variants of each of these: `batchByKey`, `batchByteSizedByKey`, and `batchWeightedByKey`.

import com.spotify.scio.values.SCollection

val elements: SCollection[String] = ???
val batchedElements: SCollection[Iterable[String]] = elements.batch(10)

Misc

Some elements of an SCollection can be randomly sampled using sample:

import com.spotify.scio.values.SCollection

val elements: SCollection[String] = ???
val result: SCollection[String] = elements.sample(withReplacement = true, fraction = 0.01)

The SCollection can be randomly split into new SCollections given a weighting of what fraction of the input should be in each split:

import com.spotify.scio.values.SCollection

val elements: SCollection[Int] = ???
val weights: Array[Double] = Array(0.2, 0.6, 0.2)
val splits: Array[SCollection[Int]] = elements.randomSplit(weights)

The “top” n elements of an SCollection given some Ordering can be found with top:

import com.spotify.scio.values.SCollection

val elements: SCollection[Int] = ???
val top10: SCollection[Iterable[Int]] = elements.top(10)

The common elements of two SCollections can be found with intersection:

import com.spotify.scio.values.SCollection

val a: SCollection[String] = ???
val b: SCollection[String] = ???
val common: SCollection[String] = a.intersection(b)

For a keyed SCollection, intersectByKey will give the elements in the LHS whose keys are in the RHS:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???
val common: SCollection[(String, Int)] = a.intersectByKey(b)

Similarly, subtract (or subtractByKey) will give the elements in the LHS that are not present in the RHS:

import com.spotify.scio.values.SCollection

val a: SCollection[String] = ???
val b: SCollection[String] = ???
val notInB: SCollection[String] = a.subtract(b)