Built-in Functionality
Scio is a thin wrapper on top of Beam offering idiomatic Scala APIs. Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts.
Basics
ScioContext
wraps Beam’sPipeline
SCollection
wraps Beam’sPCollection
ScioResult
wraps Beam’sPipelineResult
See dedicated sections on:
Core functionality
A ScioContext
represents the pipeline and is the starting point for performing reads and the means by which the pipeline is executed. Execute a pipeline by invoking run
and await completion by chaining waitUntilDone
:
import com.spotify.scio._
val sc: ScioContext = ???
sc.run().waitUntilDone()
SCollection
is the representation of the data in a pipeline at a particular point in the execution graph preceding or following a transform. SCollection
s have many of the methods you would expect on a standard Scala collection: map
, filter
, flatten
, flatMap
, reduce
, collect
, fold
, and take
.
Any SCollection
of 2-tuples is considered a keyed SCollection
and the various joins and *ByKey
variants of other methods become available. The first item in the tuple is considered the key and the second item the value. The keyBy
method creates a keyed SCollection
, where the user-defined function extracts the key from the existing values:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val result: SCollection[(String, String)] = elements.keyBy(_.head.toString)
Once keyed, elements with the same key can be grouped so that they can be processed together:
import com.spotify.scio.values.SCollection
val elements: SCollection[(String, String)] = ???
val result: SCollection[(String, Iterable[String])] = elements.groupByKey
Distinct elements can be found with distinct
(or the distinctBy
and distinctByKey
variants):
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val distinct: SCollection[String] = elements.distinct
Elements can be split into different SCollection
s with partition
, which can be useful for error handling. Note that the number of partitions should be small.
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val (lessThanFive, greaterThanFive): (SCollection[Int], SCollection[Int]) = elements.partition(_ > 5)
SCollection
s of the same type can be combined with a union
(or unionAll
) operation.
import com.spotify.scio.values.SCollection
val a: SCollection[Int] = ???
val b: SCollection[Int] = ???
val elements: SCollection[Int] = a.union(b)
Elements can be printed to the console for inspection at any point of the graph by using debug
:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
elements.debug(prefix = "myLabel: ")
ContextAndArgs
Scio’s ContextAndArgs
provides a convenient way to both parse command-line options and acquire a ScioContext
:
import com.spotify.scio._
val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs(cmdlineArgs)
If you need custom pipeline options, subclass Beam’s PipelineOptions
and use ContextAndArgs.typed
:
import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptions
trait Arguments extends PipelineOptions {
def getMyArg: String
def setMyArg(input: String): Unit
}
val cmdlineArgs: Array[String] = ???
val (sc, args) = ContextAndArgs.typed[Arguments](cmdlineArgs)
val myArg: String = args.getMyArg
Aggregations
Scio provides a suite of built-in aggregations. All *ByKey
variants do the same as the normal function, but per-key for keyed SCollection
s.
Counting
count
(orcountByKey
) counts the number of elementscountByValue
counts the number of elements for each value in aSCollection[T]
countApproxDistinct
(orcountApproxDistinctByKey
) estimates a distinct count, with Beam’sApproximateUnique
or Scio’s HyperLogLog-basedApproxDistinctCounter
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus
val elements: SCollection[String] = ???
val sketch = ZetaSketchHllPlusPlus[String]()
val result: SCollection[Long] = elements.countApproxDistinct(sketch)
Statistics
max
(ormaxByKey
) finds the maximum element given someOrdering
min
(orminByKey
) finds the minimum element given someOrdering
mean
finds the mean given someNumeric
quantilesApprox
(orapproxQuantilesByKey
) finds the distribution using Beam’sApproximateQuantiles
For SCollection
s containing Double
, Scio additionally provides a stats
method that computes the count, mean, min, max, variance, standard deviation, sample variance, and sample standard deviation over the SCollection
. Convenience methods are available directly on the SCollection
if only a single value is required:
import com.spotify.scio.values.SCollection
import com.spotify.scio.util.StatCounter
val elements: SCollection[Double] = ???
val stats: SCollection[StatCounter] = elements.stats
val variance: SCollection[Double] = stats.map { s => s.variance }
val stdev: SCollection[Double] = elements.stdev
Sums & combinations
combine
(or combineByKey
) combines elements with a set of user-defined functions:
import com.spotify.scio.values.SCollection
case class A(count: Long, total: Long)
object A {
def apply(i: Int): A = A(1L, i.toLong)
def mergeValue(a: A, i: Int): A = A(a.count + 1L, a.total + i)
def mergeCombiners(a: A, b: A) = A(a.count + b.count, a.total + b.total)
}
val elements: SCollection[Int] = ???
elements.combine(A.apply)(A.mergeValue)(A.mergeCombiners)
sum
(or sumByKey
) sums elements given a Semigroup
, while aggregate
(or aggregateByKey
) aggregates elements either with a set of user-defined functions, via a Aggregator
, or via a MonoidAggregator
.
Both Semigroup
and Monoid
instances can be derived with magnolify, assuming the behavior for the primitive types is what you expect.
Note that for String
the default Semigroup[String]
behavior is to append, which is usually not what you want.
Fully-automatic derivation can be very concise but relies on some implicit magic:
import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import magnolify.cats.auto._
case class A(count: Long, total: Long)
val elements: SCollection[A] = ???
val summed: SCollection[A] = elements.sum
val aggregated: SCollection[A] = elements.aggregate(Aggregator.fromMonoid[A])
Semi-automatic derivation in a companion object may be more intelligible:
case class A(count: Long, total: Long)
object A {
import magnolify.cats.semiauto._
import cats._
implicit val aMonoid: Monoid[A] = MonoidDerivation[A]
}
See also Algebird
Metrics
Scio supports Beam’s Counter
Distribution
and Gauge
.
See MetricsExample.
ScioResult
ScioResult
can be used to access metric values, individually or as a group:
import com.spotify.scio._
import org.apache.beam.sdk.metrics.{MetricName, Counter}
val sc: ScioContext = ???
val counter: Counter = ???
val sr: ScioResult = sc.run().waitUntilDone()
val counterValue: metrics.MetricValue[Long] = sr.counter(counter)
val counterMap: Map[MetricName, metrics.MetricValue[Long]] = sr.allCounters
Taps & Materialization
Writes return a ClosedTap
, which provides an interface to access the written results or pass them to a subsequent Scio job.
import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection
val sc: ScioContext = ???
val elements: SCollection[String] = ???
val writeTap: ClosedTap[String] = elements.saveAsTextFile("gs://output-path")
val sr: ScioResult = sc.run().waitUntilDone()
val textTap: Tap[String] = sr.tap(writeTap)
val textContexts: Iterator[String] = textTap.value
val sc2: ScioContext = ???
val results: SCollection[String] = textTap.open(sc)
The same mechanism underlies Scio’s materialize
method, which will save the contents of an SCollection
at the point of the materialize
to a temporary location and make them available after the pipeline completes:
import com.spotify.scio._
import com.spotify.scio.io.{Tap, ClosedTap}
import com.spotify.scio.values.SCollection
val sc: ScioContext = ???
val elements: SCollection[String] = ???
val materializeTap: ClosedTap[String] = elements.materialize
val sr: ScioResult = sc.run().waitUntilDone()
val textTap: Tap[String] = sr.tap(materializeTap)
See also: WordCountOrchestration example.
Use native Beam functionality
If there is a need to use a Beam IO or transform for which Scio does not have an API, you can easily use the native Beam API for single steps in a pipeline otherwise written in Scio.
customInput
supports reading from a Beam source; any transform of type PTransform[PBegin, PCollection[T]]
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.apache.beam.sdk.io.TextIO
val sc: ScioContext = ???
val filePattern: String = ???
val textRead: PTransform[PBegin, PCollection[String]] = TextIO.read().from(filePattern)
val elements: SCollection[String] = sc.customInput("ReadText", textRead)
saveAsCustomOutput
supports writing to a Beam sink; any transform of type PTransform[PCollection[T], PDone]
:
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PDone, PCollection}
import org.apache.beam.sdk.io.TextIO
val outputLocation: String = ???
val elements: SCollection[String] = ???
val textWrite: PTransform[PCollection[String], PDone] = TextIO.write().to(outputLocation)
elements.saveAsCustomOutput("WriteText", textWrite)
Finally, applyTransform
supports using any Beam transform of type PTransform[PCollection[T], PCollection[U]]
:
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.{PTransform, Sum}
import org.apache.beam.sdk.values.PCollection
import java.lang
val elements: SCollection[Double] = ???
val transform: PTransform[PCollection[lang.Double], PCollection[lang.Double]] = Sum.doublesGlobally
val result: SCollection[lang.Double] = elements
.map(Double.box)
.applyTransform(transform)
See also: BeamExample
Windowing
timestampBy
allows for changing an element’s timestamp:
import com.spotify.scio.values.SCollection
import org.joda.time.Instant
case class A(timestamp: Instant, value: String)
val elements: SCollection[A] = ???
val timestamped: SCollection[A] = elements.timestampBy(_.timestamp)
The withTimestamp
, withWindow
, and withPaneInfo
functions flatten window metadata into the SCollection
:
import com.spotify.scio.values.SCollection
import org.joda.time.Instant
val elements: SCollection[String] = ???
val timestamped: SCollection[(String, Instant)] = elements.withTimestamp
toWindowed
converts the SCollection
to a WindowedSCollection
whose elements are all instances of WindowedValue
, which gives full access to the windowing metadata:
import com.spotify.scio.values._
import org.joda.time.Instant
val elements: SCollection[String] = ???
val windowed: WindowedSCollection[String] = elements.toWindowed
windowed.map { v: WindowedValue[String] =>
v.withTimestamp(Instant.now())
}
Scio provides convenience functions for the common types of windowing (withFixedWindows
, withSlidingWindows
, withSessionWindows
, withGlobalWindow
) but also provides full control over the windowing with withWindowFn
.
import com.spotify.scio.values.SCollection
import org.joda.time.Duration
val elements: SCollection[String] = ???
val windowedElements: SCollection[String] = elements.withFixedWindows(Duration.standardHours(1))
Batching
In cases where some transform performs better on a group of items, elements can be batched by number of elements with `batch`
, by the size of the elements with `batchByteSized`
, or by some user-defined weight with `batchWeighted`
. There are also keyed variants of each of these: `batchByKey`
, `batchByteSizedByKey`
, and `batchWeightedByKey`
.
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val batchedElements: SCollection[Iterable[String]] = elements.batch(10)
Misc
Some elements of an SCollection
can be randomly sampled using sample
:
import com.spotify.scio.values.SCollection
val elements: SCollection[String] = ???
val result: SCollection[String] = elements.sample(withReplacement = true, fraction = 0.01)
The SCollection
can be randomly split into new SCollections
given a weighting of what fraction of the input should be in each split:
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val weights: Array[Double] = Array(0.2, 0.6, 0.2)
val splits: Array[SCollection[Int]] = elements.randomSplit(weights)
The “top” n elements of an SCollection
given some Ordering
can be found with top
:
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val top10: SCollection[Iterable[Int]] = elements.top(10)
The common elements of two SCollections
can be found with intersection
:
import com.spotify.scio.values.SCollection
val a: SCollection[String] = ???
val b: SCollection[String] = ???
val common: SCollection[String] = a.intersection(b)
For a keyed SCollection
, intersectByKey
will give the elements in the LHS whose keys are in the RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???
val common: SCollection[(String, Int)] = a.intersectByKey(b)
Similarly, subtract
(or subtractByKey
) will give the elements in the LHS that are not present in the RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[String] = ???
val b: SCollection[String] = ???
val notInB: SCollection[String] = a.subtract(b)