Side Inputs
Side inputs provide a way to broadcast small amounts of data to all workers.
Side inputs are more performant if they fit entirely into memory. We therefore recommend using the singleton variants if possible, and setting the –workerCacheMb option. For a dataflow job on a standard worker we recommend a maximum size of roughly 1GB for a side input. If you have a need for a larger side input, see the section on Sparkey side inputs.
See also the Beam Programming Guide’s section on Side Inputs which provides some additional details.
Standard side inputs
Converting an SCollection to a side-input Seq or Iterable is supported via asListSideInput and asIterableSideInput respectively:
import com.spotify.scio.values.{SCollection, SideInput}
val stringElements: SCollection[String] = ???
val stringListSI: SideInput[Seq[String]] = stringElements.asListSideInput
val stringIterSI: SideInput[Iterable[String]] = stringElements.asIterableSideInput
For keyed SCollections, Scio provides asMapSideInput for when there is a unique key-value relationship and asMultiMapSideInput for when a key may have multiple values:
import com.spotify.scio.values.{SCollection, SideInput}
val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSideInput
val mapMultiSI: SideInput[Map[String, Iterable[String]]] = keyedElements.asMultiMapSideInput
Singleton variants
In addition to standard Beam SideInputs, Scio also provides Singleton variants that are often more performant than the Beam defaults.
For SCollections with a single element, asSingletonSideInput will convert it to a side input:
import com.spotify.scio.values.{SCollection, SideInput}
val elements: SCollection[Int] = ???
val sumSI: SideInput[Int] = elements.sum.asSingletonSideInput
To get an SideInput of Set[T], use asSetSingletonSideInput:
import com.spotify.scio.values.{SCollection, SideInput}
val elements: SCollection[String] = ???
val setSI: SideInput[Set[String]] = elements.asSetSingletonSideInput
For keyed SCollections, asMapSingletonSideInput for when there is a unique key-value relationship and asMultiMapSingletonSideInput for when a key may have multiple values:
import com.spotify.scio.values.{SCollection, SideInput}
val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSingletonSideInput
val mapMultiSI: SideInput[Map[String, Iterable[String]]] = keyedElements.asMultiMapSingletonSideInput
Side input context
To ‘join’ a SideInput, use withSideInputs, then access it via the SideInputContext:
import com.spotify.scio.values.{SCollection, SideInput}
val keyedElements: SCollection[(String, String)] = ???
val mapSingleSI: SideInput[Map[String, String]] = keyedElements.asMapSingletonSideInput
val elements: SCollection[String] = ???
elements
.withSideInputs(mapSingleSI)
.map { case (element, ctx) =>
val mapSingle: Map[String, String] = ctx(mapSingleSI)
val value: Option[String] = mapSingle.get(element)
value
}
workerCacheMb option
By default, Dataflow workers allocate 100MB (see DataflowWorkerHarnessOptions#getWorkerCacheMb) of memory for caching side inputs, and falls back to disk or network. Jobs with large side inputs may therefore be slow. To override this default, register DataflowWorkerHarnessOptions before parsing command line arguments and then pass --workerCacheMb=N when submitting the job:
import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions
def main(cmdlineArgs: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[DataflowWorkerHarnessOptions])
val (sc, args) = ContextAndArgs(cmdlineArgs)
???
}