Joins
Scio provides a full suite of join functionality and a few extras that solve tricky edge-cases in large-scale data processing.
All joins operate over SCollection
s containing 2-tuples, where the first tuple item is considered the key and the second the value. The order in which SCollection
s are joined matters; larger datasets should be further to the left. For example in a.join(b)
, a
should be the larger of the two datasets and by convention a
is called the left-hand-side or LHS, while b
is the right-hand-side or RHS.
Cogroup
The Beam transform which underlies the standard joins below is the Cogroup. Scio also provides a cogroup
operation, which returns iterables from each SCollection
containing all the values which match each key:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Iterable[String], Iterable[Int]))] = a.cogroup(b)
Standard joins
Scio’s standard joins have SQL-like names with SQL-like semantics. In the examples below, the contents of the LHS are of type (K, V)
, while the RHS are of type (K, W)
.
join
produces elements of (K, (V, W))
, where the key K
must be in both the LHS and RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (String, Int))] = a.join(b)
leftOuterJoin
produces elements of (K (V, Option[W]))
, where the key K
is in the LHS but may not be in the RHS:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (String, Option[Int]))] = a.leftOuterJoin(b)
rightOuterJoin
produces elements of (K, (Option[V], W]))
, where the key K
is in the RHS but may not be in the LHS:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Option[String], Int))] = a.rightOuterJoin(b)
fullOuterJoin
produces elements of (K (Option[V], Option[W]))
, where the key K
can be in either side:
import com.spotify.scio.values.SCollection
val a: SCollection[(String, String)] = ???
val b: SCollection[(String, Int)] = ???
val elements: SCollection[(String, (Option[String], Option[Int]))] = a.fullOuterJoin(b)
When multiple joins of the same type are chained, it is more efficient to use Scio’s MultiJoin
class. Instead of a.join(b).join(c)
prefer MultiJoin
(or its variants, MultiJoin.left
, MultiJoin.outer
, MultiJoin.cogroup
):
import com.spotify.scio.values.SCollection
import com.spotify.scio.util.MultiJoin
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val c: SCollection[(String, Float)] = ???
val elements: SCollection[(String, (Int, Boolean, Float))] = MultiJoin(a, b, c)
Hash joins
Scio’s hashJoin
and variants hashLeftOuterJoin
, and hashFullOuterJoin
provide a convenient syntax over the top of Beam’s SideInput class to avoid shuffle during the join. The RHS should fit in memory, as with normal SideInputs.
import com.spotify.scio.values.SCollection
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val elements: SCollection[(String, (Int, Boolean))] = a.hashJoin(b)
In the less-common case where the LHS contains only keys to be looked-up, hashLookup
will join in all matching values from the RHS. Again, the RHS should fit in memory.
import com.spotify.scio.values.SCollection
val a: SCollection[String] = ???
val b: SCollection[(String, String)] = ???
val elements: SCollection[(String, Iterable[String])] = a.hashLookup(b)
In addition, Scio also provides the shuffle-free intersection and subtraction operations hashIntersectByKey
and hashSubtractByKey
.
import com.spotify.scio.values.SCollection
val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???
val subtracted: SCollection[(String, Int)] = a.hashSubtractByKey(b)
val intersected: SCollection[(String, Int)] = a.hashIntersectByKey(b)
Large hash join
Similar to Hash Joins, Scio’s largeHashJoin
and variants largeHashLeftOuterJoin
and largeHashFullOuterJoin
provide a convenient syntax on top of Scio’s Sparkey support to avoid shuffle during a join. Use of sparkey requires only that the RHS fit on disk.
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.sparkey._
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val elements: SCollection[(String, (Int, Boolean))] = a.largeHashJoin(b)
Larger shuffle-free intersection and subtraction operations are also provided as largeHashIntersectByKey
and largeHashSubtractByKey
.
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.sparkey._
val a: SCollection[(String, Int)] = ???
val b: SCollection[String] = ???
val subtracted: SCollection[(String, Int)] = a.largeHashSubtractByKey(b)
val intersected: SCollection[(String, Int)] = a.largeHashIntersectByKey(b)
Sparse join
Scio supports a ‘sparse join’ for cases where both the LHS and RHS of a join are large, but where the keys in the RHS cover a relatively small number of rows in the LHS.
In this case, an optimization of the join can significantly reduce the shuffle. The keys of the RHS are inserted into a Bloom Filter, a probabilistic data structure that effectively acts as a Set
but with some risk of false positives. Elements in the LHS dataset are partitioned by passing an element’s key through the filter and splitting the dataset on whether the key is found or not. All LHS keys which are found in the filter are probably in the RHS dataset, so a full join is performed on these elements. Any LHS key not found in the filter are definitely not in the RHS dataset, so these items can be handled without a join. To properly size the Bloom filter, an estimate of the number of keys in the RHS (rhsNumKeys
) must be provided to the join function.
In addition to sparseJoin
(and variants sparseLeftOuterJoin
, sparseRightOuterJoin
, and sparseFullOuterJoin
) Scio also provides a sparseIntersectByKey
implementation. Scio uses Guava’s BloomFilter
. Import magnolify.guava.auto._
to get common instances of Guava Funnel
:
import com.spotify.scio.values.SCollection
import magnolify.guava.auto._
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, Boolean)] = ???
val c: SCollection[String] = ???
val bNumKeys: Int = ???
val joined = a.sparseJoin(b, bNumKeys)
val cNumKeys: Int = ???
val intersected: SCollection[(String, Int)] = a.sparseIntersectByKey(c, cNumKeys)
Finally, Scio provides sparseLookup
, a special-case for joining all items from the RHS with matching keys into the LHS items with that key. Differently than the other sparse
variants, in this case an estimate of the number of keys in the LHS must be provided:
import com.spotify.scio.values.SCollection
import magnolify.guava.auto._
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, String)] = ???
val aNumKeys: Int = ???
val lookedUp: SCollection[(String, (Int, Iterable[String]))] = a.sparseLookup(b, aNumKeys)
Skewed Join
Similar to sparse joins, Scio supports a ‘skewed join’ for the special case in which some keys in a dataset are very frequent, or hot.
Scio uses a Count-Min sketch (or CMS), a probabilistic data structure that is internally similar to a Bloom filter, but which provides an estimated count for a given item which is explicitly an _over_estimate. The keys of the LHS are counted and those which exceed the value of the hotKeyThreshold
parameter (default: 9000
) plus an error bound are considered ‘hot’, while any remaining key is ‘cold’. Both the LHS and RHS are divided into ‘hot’ and ‘chill’ partitions. The chill sides are joined normally, while the hot side of the RHS is hashJoin
ed into the hot LHS, avoiding shuffle on this segment of the dataset.
Scio provides skewedJoin
, skewedLeftOuterJoin
, and skewedFullOuterJoin
variants. Import com.twitter.algebird.CMSHasherImplicits._
for the implicits required for count-min sketch.
import com.spotify.scio.values.SCollection
import com.twitter.algebird.CMSHasherImplicits._
val a: SCollection[(String, Int)] = ???
val b: SCollection[(String, String)] = ???
val elements: SCollection[(String, (Int, String))] = a.skewedJoin(b)
Sort Merge Bucket (SMB)
Sort-Merge Buckets allow for shuffle-free joins of large datasets. See Sort Merge Bucket
See also
- Join Optimizations at Spotify How Scio can save you time and money with clever join strategies and approximate algorithms, Apache Beam Summit 2022