Fanout

Scio ships with two SCollection variants that provide fanout over aggregations where an interim aggregation is performed before the final aggregation is computed. The interim step pairs the data to be aggregated with a synthetic key, then aggregates within this artificial keyspace before passing the partial aggregations on to the final aggregation step. The interim step requires an additional shuffle but can make the aggregation more parallelizable and reduces the impact of a hot key.

The aggregate, combine, fold, reduce, sum, top transforms and their keyed variants are supported.

WithFanout

withFanout aggregates over the number of synthetic keys specified by the fanout argument:

import com.spotify.scio._
import com.spotify.scio.values.SCollection

val elements: SCollection[Int] = ???
val result: SCollection[Int] = elements.withFanout(fanout = 10).sum

WithHotKeyFanout

For hot keys, two variants allow a user to specify either a static fanout via an integer hotKeyFanout argument to withHotKeyFanout, or a dynamic per-key fanout via a function K => Int argument, also called hotKeyFanout to withHotKeyFanout:

import com.spotify.scio._
import com.spotify.scio.values.SCollection

val elements: SCollection[(String, Int)] = ???
val staticResult: SCollection[(String, Int)] = elements.withHotKeyFanout(hotKeyFanout = 10).sumByKey
val dynamicResult: SCollection[(String, Int)] = elements
  .withHotKeyFanout(hotKeyFanout = s => s.length % 10)
  .sumByKey