Fanout
Scio ships with two SCollection
variants that provide fanout over aggregations where an interim aggregation is performed before the final aggregation is computed. The interim step pairs the data to be aggregated with a synthetic key, then aggregates within this artificial keyspace before passing the partial aggregations on to the final aggregation step. The interim step requires an additional shuffle but can make the aggregation more parallelizable and reduces the impact of a hot key.
The aggregate
, combine
, fold
, reduce
, sum
, top
transforms and their keyed variants are supported.
WithFanout
withFanout
aggregates over the number of synthetic keys specified by the fanout
argument:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
val elements: SCollection[Int] = ???
val result: SCollection[Int] = elements.withFanout(fanout = 10).sum
WithHotKeyFanout
For hot keys, two variants allow a user to specify either a static fanout via an integer hotKeyFanout
argument to withHotKeyFanout
, or a dynamic per-key fanout via a function K => Int
argument, also called hotKeyFanout
to withHotKeyFanout
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
val elements: SCollection[(String, Int)] = ???
val staticResult: SCollection[(String, Int)] = elements.withHotKeyFanout(hotKeyFanout = 10).sumByKey
val dynamicResult: SCollection[(String, Int)] = elements
.withHotKeyFanout(hotKeyFanout = s => s.length % 10)
.sumByKey