Built-in Functionality
Scio is a thin wrapper on top of Beam offering idiomatic Scala APIs. Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts.
Basics
ScioContextwraps Beam’sPipelineSCollectionwraps Beam’sPCollectionScioResultwraps Beam’sPipelineResult
See dedicated sections on:
Core functionality
A ScioContext represents the pipeline and is the starting point for performing reads and the means by which the pipeline is executed. Execute a pipeline by invoking run and await completion by chaining waitUntilDone:
import com.spotify.scio._
val sc: ScioContext = ???
sc.run().waitUntilDone()
SCollection is the representation of the data in a pipeline at a particular point in the execution graph preceding or following a transform. SCollections have many of the methods you would expect on a standard Scala collection: map, filter, flatten, flatMap, reduce, collect, fold, and take.
Any SCollection of 2-tuples is considered a keyed SCollection and the various joins and *ByKey variants of other methods become available. The first item in the tuple is considered the key and the second item the value. The keyBy method creates a keyed SCollection, where the user-defined function extracts the key from the existing values:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val result: SCollection[(String, String)] = elements.keyBy(_.head.toString)
Once keyed, elements with the same key can be grouped so that they can be processed together:
import com.spotify.scio.values.SCollection
val elements: SCollection[(String, String)] = ???
val result: SCollection[(String, Iterable[String])] = elements.groupByKey
Distinct elements can be found with distinct (or the distinctBy and distinctByKey variants):
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val distinct: SCollection[String] = elements.distinct
Elements can be split into different SCollections with partition, which can be useful for error handling. Note that the number of partitions should be small.
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val (lessThanFive, greaterThanFive): (SCollection[Int], SCollection[Int]) = elements.partition(_ > 5)
SCollections of the same type can be combined with a union (or unionAll) operation.
import com.spotify.scio.values.SCollection
val a: SCollection[Int] = ???
val b: SCollection[Int] = ???
val elements: SCollection[Int] = a.union(b)
Elements can be printed to the console for inspection at any point of the graph by using debug:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
elements.debug(prefix = "myLabel: ")
ContextAndArgs
Scio’s ContextAndArgs provides a convenient way to both parse command-line options and acquire a ScioContext:
import com.spotify.scio._
val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs(cmdlineArgs)
If you need custom pipeline options, subclass Beam’s PipelineOptions and use ContextAndArgs.typed:
import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptions
trait Arguments extends PipelineOptions {
def getMyArg: String
def setMyArg(input: String): Unit
}
val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs.typed[Arguments](cmdlineArgs)
val myArg: String = args.getMyArg
Aggregations
Scio provides a suite of built-in aggregations. All *ByKey variants do the same as the normal function, but per-key for keyed SCollections.
Counting
count(orcountByKey) counts the number of elementscountByValuecounts the number of elements for each value in aSCollection[T]countApproxDistinct(orcountApproxDistinctByKey) estimates a distinct count, with Beam’sApproximateUniqueor Scio’s HyperLogLog-basedApproxDistinctCounter
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus
val elements: SCollection[String] = ???
val sketch = ZetaSketchHllPlusPlus[String]()
val result: SCollection[Long] = elements.countApproxDistinct(sketch)
Statistics
max(ormaxByKey) finds the maximum element given someOrderingmin(orminByKey) finds the minimum element given someOrderingmeanfinds the mean given someNumericquantilesApprox(orapproxQuantilesByKey) finds the distribution using Beam’sApproximateQuantiles
For SCollections containing Double, Scio additionally provides a stats method that computes the count, mean, min, max, variance, standard deviation, sample variance, and sample standard deviation over the SCollection. Convenience methods are available directly on the SCollection if only a single value is required:
import com.spotify.scio.values.SCollection
import com.spotify.scio.util.StatCounter
val elements: SCollection[Double] = ???
val stats: SCollection[StatCounter] = elements.stats
val variance: SCollection[Double] = stats.map { s => s.variance }
val stdev: SCollection[Double] = elements.stdev
Sums & combinations
combine (or combineByKey) combines elements with a set of user-defined functions:
import com.spotify.scio.values.SCollection
case class A(count: Long, total: Long)
object A {
def apply(i: Int): A = A(1L, i.toLong)
def mergeValue(a: A, i: Int): A = A(a.count + 1L, a.total + i)
def mergeCombiners(a: A, b: A) = A(a.count + b.count, a.total + b.total)
}
val elements: SCollection[Int] = ???
elements.combine(A.apply)(A.mergeValue)(A.mergeCombiners)
sum (or sumByKey) sums elements given a Semigroup, while aggregate (or aggregateByKey) aggregates elements either with a set of user-defined functions, via a Aggregator, or via a MonoidAggregator.
Both Semigroup and Monoid instances can be derived with magnolify, assuming the behavior for the primitive types is what you expect.
Note that for String the default Semigroup[String] behavior is to append, which is usually not what you want.
Fully-automatic derivation can be very concise but relies on some implicit magic:
import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import magnolify.cats.auto._
case class A(count: Long, total: Long)
val elements: SCollection[A] = ???
val summed: SCollection[A] = elements.sum
val aggregated: SCollection[A] = elements.aggregate(Aggregator.fromMonoid[A])
Semi-automatic derivation in a companion object may be more intelligible:
case class A(count: Long, total: Long)
object A {
import magnolify.cats.semiauto._
import cats._
implicit val aMonoid: Monoid[A] = MonoidDerivation[A]
}
See also Algebird
Metrics
Scio supports Beam’s Counter Distribution and Gauge.
See MetricsExample.
ScioResult
ScioResult can be used to access metric values, individually or as a group:
import com.spotify.scio._
import org.apache.beam.sdk.metrics.{MetricName, Counter}
val sc: ScioContext = ???
val counter: Counter = ???
val sr: ScioResult = sc.run().waitUntilDone()
val counterValue: metrics.MetricValue[Long] = sr.counter(counter)
val counterMap: Map[MetricName, metrics.MetricValue[Long]] = sr.allCounters
Taps & Materialization
Writes return a ClosedTap, which provides an interface to access the written results or pass them to a subsequent Scio job.
import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection
val sc: ScioContext = ???
val elements: SCollection[String] = ???
val writeTap: ClosedTap[String] = elements.saveAsTextFile("gs://output-path")
val sr: ScioResult = sc.run().waitUntilDone()
val textTap: Tap[String] = sr.tap(writeTap)
val textContexts: Iterator[String] = textTap.value
val sc2: ScioContext = ???
val results: SCollection[String] = textTap.open(sc)
The same mechanism underlies Scio’s materialize method, which will save the contents of an SCollection at the point of the materialize to a temporary location and make them available after the pipeline completes:
import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection
val sc: ScioContext = ???
val elements: SCollection[String] = ???
val materializeTap: ClosedTap[String] = elements.materialize
val sr: ScioResult = sc.run().waitUntilDone()
val textTap: Tap[String] = sr.tap(materializeTap)
See also: WordCountOrchestration example.
Use native Beam functionality
If there is a need to use a Beam IO or transform for which Scio does not have an API, you can easily use the native Beam API for single steps in a pipeline otherwise written in Scio.
customInput supports reading from a Beam source; any transform of type PTransform[PBegin, PCollection[T]]:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.apache.beam.sdk.io.TextIO
val sc: ScioContext = ???
val filePattern: String = ???
val textRead: PTransform[PBegin, PCollection[String]] = TextIO.read().from(filePattern)
val elements: SCollection[String] = sc.customInput("ReadText", textRead)
saveAsCustomOutput supports writing to a Beam sink; any transform of type PTransform[PCollection[T], PDone]:
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PDone, PCollection}
import org.apache.beam.sdk.io.TextIO
val outputLocation: String = ???
val elements: SCollection[String] = ???
val textWrite: PTransform[PCollection[String], PDone] = TextIO.write().to(outputLocation)
elements.saveAsCustomOutput("WriteText", textWrite)
Finally, applyTransform supports using any Beam transform of type PTransform[PCollection[T], PCollection[U]]:
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.{PTransform, Sum}
import org.apache.beam.sdk.values.PCollection
import java.lang
val elements: SCollection[Double] = ???
val transform: PTransform[PCollection[lang.Double], PCollection[lang.Double]] = Sum.doublesGlobally
val result: SCollection[lang.Double] = elements
.map(Double.box)
.applyTransform(transform)
See also: BeamExample
Windowing
timestampBy allows for changing an element’s timestamp:
import com.spotify.scio.values.SCollection
import org.joda.time.Instant
case class A(timestamp: Instant, value: String)
val elements: SCollection[A] = ???
val timestamped: SCollection[A] = elements.timestampBy(_.timestamp)
The withTimestamp, withWindow, and withPaneInfo functions flatten window metadata into the SCollection:
import com.spotify.scio.values.SCollection
import org.joda.time.Instant
val elements: SCollection[String] = ???
val timestamped: SCollection[(String, Instant)] = elements.withTimestamp
toWindowed converts the SCollection to a WindowedSCollection whose elements are all instances of WindowedValue, which gives full access to the windowing metadata:
import com.spotify.scio.values._
import org.joda.time.Instant
val elements: SCollection[String] = ???
val windowed: WindowedSCollection[String] = elements.toWindowed
windowed.map { v: WindowedValue[String] =>
v.withTimestamp(Instant.now())
}
Scio provides convenience functions for the common types of windowing (withFixedWindows, withSlidingWindows, withSessionWindows, withGlobalWindow) but also provides full control over the windowing with withWindowFn.
import com.spotify.scio.values.SCollection
import org.joda.time.Duration
val elements: SCollection[String] = ???
val windowedElements: SCollection[String] = elements.withFixedWindows(Duration.standardHours(1))
Batching
In cases where some transform performs better on a group of items, elements can be batched by number of elements with `batch`, by the size of the elements with `batchByteSized`, or by some user-defined weight with `batchWeighted`. There are also keyed variants of each of these: `batchByKey`, `batchByteSizedByKey`, and `batchWeightedByKey`.
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val batchedElements: SCollection[Iterable[String]] = elements.batch(10)
Misc
Some elements of an SCollection can be randomly sampled using sample:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val result: SCollection[String] = elements.sample(withReplacement = true, fraction = 0.01)
The SCollection can be randomly split into new SCollections given a weighting of what fraction of the input should be in each split:
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val weights: Array[Double] = Array(0.2, 0.6, 0.2)
val splits: Array[SCollection[Int]] = elements.randomSplit(weights)
The “top” n elements of an SCollection given some Ordering can be found with top:
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val top10: SCollection[Iterable[Int]] = elements.top(10)
The common elements of two SCollections can be found with intersection:
import com.spotify.scio.values.SCollection
val a: SCollection[String] = ???
val b: SCollection[String] = ???
val common: SCollection[String] = a.intersection(b)
For a keyed SCollection, intersectByKey will give the elements in the LHS whose keys are in the RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???
val common: SCollection[(String, Int)] = a.intersectByKey(b)
Similarly, subtract (or subtractByKey) will give the elements in the LHS that are not present in the RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[String] = ???
val b: SCollection[String] = ???
val notInB: SCollection[String] = a.subtract(b)