Joins

Scio provides a full suite of join functionality and a few extras that solve tricky edge-cases in large-scale data processing.

All joins operate over SCollections containing 2-tuples, where the first tuple item is considered the key and the second the value. The order in which SCollections are joined matters; larger datasets should be further to the left. For example in a.join(b), a should be the larger of the two datasets and by convention a is called the left-hand-side or LHS, while b is the right-hand-side or RHS.

Cogroup

The Beam transform which underlies the standard joins below is the Cogroup. Scio also provides a cogroup operation, which returns iterables from each SCollection containing all the values which match each key:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Iterable[String], Iterable[Int]))] = a.cogroup(b)

Standard joins

Scio’s standard joins have SQL-like names with SQL-like semantics. In the examples below, the contents of the LHS are of type (K, V), while the RHS are of type (K, W).

join produces elements of (K, (V, W)), where the key K must be in both the LHS and RHS:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (String, Int))] = a.join(b)

leftOuterJoin produces elements of (K (V, Option[W])), where the key K is in the LHS but may not be in the RHS:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (String, Option[Int]))] = a.leftOuterJoin(b)

rightOuterJoin produces elements of (K, (Option[V], W])), where the key K is in the RHS but may not be in the LHS:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Option[String], Int))] = a.rightOuterJoin(b)

fullOuterJoin produces elements of (K (Option[V], Option[W])), where the key K can be in either side:

import com.spotify.scio.values.SCollection

val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Option[String], Option[Int]))] = a.fullOuterJoin(b)

When multiple joins of the same type are chained, it is more efficient to use Scio’s MultiJoin class. Instead of a.join(b).join(c) prefer MultiJoin (or its variants, MultiJoin.left, MultiJoin.outer, MultiJoin.cogroup):

import com.spotify.scio.values.SCollection
import com.spotify.scio.util.MultiJoin

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val c: SCollection[(String, Float)] = ???
val elements: SCollection[(String, (Int, Boolean, Float))] = MultiJoin(a, b, c)

Hash joins

Scio’s hashJoin and variants hashLeftOuterJoin, and hashFullOuterJoin provide a convenient syntax over the top of Beam’s SideInput class to avoid shuffle during the join. The RHS should fit in memory, as with normal SideInputs.

import com.spotify.scio.values.SCollection

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val elements: SCollection[(String, (Int, Boolean))] = a.hashJoin(b)

In the less-common case where the LHS contains only keys to be looked-up, hashLookup will join in all matching values from the RHS. Again, the RHS should fit in memory.

import com.spotify.scio.values.SCollection

val a: SCollection[String] = ???
val b: SCollection[(String, String)] = ???
val elements: SCollection[(String, Iterable[String])] = a.hashLookup(b)

In addition, Scio also provides the shuffle-free intersection and subtraction operations hashIntersectByKey and hashSubtractByKey.

import com.spotify.scio.values.SCollection

val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???

val subtracted: SCollection[(String, Int)] = a.hashSubtractByKey(b)
val intersected: SCollection[(String, Int)] = a.hashIntersectByKey(b)

Large hash join

Similar to Hash Joins, Scio’s largeHashJoin and variants largeHashLeftOuterJoin and largeHashFullOuterJoin provide a convenient syntax on top of Scio’s Sparkey support to avoid shuffle during a join. Use of sparkey requires only that the RHS fit on disk.

import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.sparkey._

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val elements: SCollection[(String, (Int, Boolean))] = a.largeHashJoin(b)

Larger shuffle-free intersection and subtraction operations are also provided as largeHashIntersectByKey and largeHashSubtractByKey.

import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.sparkey._

val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???

val subtracted: SCollection[(String, Int)] = a.largeHashSubtractByKey(b)
val intersected: SCollection[(String, Int)] = a.largeHashIntersectByKey(b)

Sparse join

Scio supports a ‘sparse join’ for cases where both the LHS and RHS of a join are large, but where the keys in the RHS cover a relatively small number of rows in the LHS.

In this case, an optimization of the join can significantly reduce the shuffle. The keys of the RHS are inserted into a Bloom Filter, a probabilistic data structure that effectively acts as a Set but with some risk of false positives. Elements in the LHS dataset are partitioned by passing an element’s key through the filter and splitting the dataset on whether the key is found or not. All LHS keys which are found in the filter are probably in the RHS dataset, so a full join is performed on these elements. Any LHS key not found in the filter are definitely not in the RHS dataset, so these items can be handled without a join. To properly size the Bloom filter, an estimate of the number of keys in the RHS (rhsNumKeys) must be provided to the join function.

In addition to sparseJoin (and variants sparseLeftOuterJoin, sparseRightOuterJoin, and sparseFullOuterJoin) Scio also provides a sparseIntersectByKey implementation. Scio uses Guava’s BloomFilter. Import magnolify.guava.auto._ to get common instances of Guava Funnel:

import com.spotify.scio.values.SCollection
import magnolify.guava.auto._

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val c: SCollection[String] = ???

val bNumKeys: Int = ???
val joined = a.sparseJoin(b, bNumKeys)

val cNumKeys: Int = ???
val intersected: SCollection[(String, Int)] = a.sparseIntersectByKey(c, cNumKeys)

Finally, Scio provides sparseLookup, a special-case for joining all items from the RHS with matching keys into the LHS items with that key. Differently than the other sparse variants, in this case an estimate of the number of keys in the LHS must be provided:

import com.spotify.scio.values.SCollection
import magnolify.guava.auto._

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, String)] = ???

val aNumKeys: Int = ???
val lookedUp: SCollection[(String, (Int, Iterable[String]))] = a.sparseLookup(b, aNumKeys)

Skewed Join

Similar to sparse joins, Scio supports a ‘skewed join’ for the special case in which some keys in a dataset are very frequent, or hot.

Scio uses a Count-Min sketch (or CMS), a probabilistic data structure that is internally similar to a Bloom filter, but which provides an estimated count for a given item which is explicitly an _over_estimate. The keys of the LHS are counted and those which exceed the value of the hotKeyThreshold parameter (default: 9000) plus an error bound are considered ‘hot’, while any remaining key is ‘cold’. Both the LHS and RHS are divided into ‘hot’ and ‘chill’ partitions. The chill sides are joined normally, while the hot side of the RHS is hashJoined into the hot LHS, avoiding shuffle on this segment of the dataset.

Scio provides skewedJoin, skewedLeftOuterJoin, and skewedFullOuterJoin variants. Import com.twitter.algebird.CMSHasherImplicits._ for the implicits required for count-min sketch.

import com.spotify.scio.values.SCollection
import com.twitter.algebird.CMSHasherImplicits._

val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, String)] = ???
val elements: SCollection[(String, (Int, String))] = a.skewedJoin(b)

Sort Merge Bucket (SMB)

Sort-Merge Buckets allow for shuffle-free joins of large datasets. See Sort Merge Bucket

See also

  • Join Optimizations at Spotify How Scio can save you time and money with clever join strategies and approximate algorithms, Apache Beam Summit 2022