Side Inputs

Side inputs provide a way to broadcast small amounts of data to all workers.

Side inputs are more performant if they fit entirely into memory. We therefore recommend using the singleton variants if possible, and setting the –workerCacheMb option. For a dataflow job on a standard worker we recommend a maximum size of roughly 1GB for a side input. If you have a need for a larger side input, see the section on Sparkey side inputs.

See also the Beam Programming Guide’s section on Side Inputs which provides some additional details.

Standard side inputs

Converting an SCollection to a side-input Seq or Iterable is supported via asListSideInput and asIterableSideInput respectively:

import com.spotify.scio.values.{SCollection, SideInput}

val stringElements: SCollection[String] = ???
val stringListSI: SideInput[Seq[String]] = stringElements.asListSideInput
val stringIterSI: SideInput[Iterable[String]] = stringElements.asIterableSideInput

For keyed SCollections, Scio provides asMapSideInput for when there is a unique key-value relationship and asMultiMapSideInput for when a key may have multiple values:

import com.spotify.scio.values.{SCollection, SideInput}

val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSideInput
val mapMultiSI: SideInput[Map[String, Iterable[String]]] = keyedElements.asMultiMapSideInput

Singleton variants

In addition to standard Beam SideInputs, Scio also provides Singleton variants that are often more performant than the Beam defaults.

For SCollections with a single element, asSingletonSideInput will convert it to a side input:

import com.spotify.scio.values.{SCollection, SideInput}

val elements: SCollection[Int] = ???
val sumSI: SideInput[Int] = elements.sum.asSingletonSideInput

To get an SideInput of Set[T], use asSetSingletonSideInput:

import com.spotify.scio.values.{SCollection, SideInput}

val elements: SCollection[String] = ???
val setSI: SideInput[Set[String]] = elements.asSetSingletonSideInput

For keyed SCollections, asMapSingletonSideInput for when there is a unique key-value relationship and asMultiMapSingletonSideInput for when a key may have multiple values:

import com.spotify.scio.values.{SCollection, SideInput}

val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSingletonSideInput
val mapMultiSI: SideInput[Map[String, Iterable[String]]] = keyedElements.asMultiMapSingletonSideInput

Side input context

To ‘join’ a SideInput, use withSideInputs, then access it via the SideInputContext:

import com.spotify.scio.values.{SCollection, SideInput}

val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSingletonSideInput

val elements: SCollection[String] = ???
elements
  .withSideInputs(mapSingleSI)
  .map { case (element, ctx) =>
    val mapSingle: Map[String, String] = ctx(mapSingleSI)
    val value: Option[String] = mapSingle.get(element)
    value
  }

workerCacheMb option

By default, Dataflow workers allocate 100MB (see DataflowWorkerHarnessOptions#getWorkerCacheMb) of memory for caching side inputs, and falls back to disk or network. Jobs with large side inputs may therefore be slow. To override this default, register DataflowWorkerHarnessOptions before parsing command line arguments and then pass --workerCacheMb=N when submitting the job:

import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions

def main(cmdlineArgs: Array[String]): Unit = {
  PipelineOptionsFactory.register(classOf[DataflowWorkerHarnessOptions])
  val (sc, args) = ContextAndArgs(cmdlineArgs)
  ???
}