HyperLogLog
HyperLogLog is an algorithm to estimate the cardinality of a large dataset. Counting distinct in a large dataset requires linear space. Hll algorithm approximate the cardinality with a small memory footprint.
HyperLogLog++ is an improved version of HyperLogLog
algorithms presented by Google. It more accurately estimates distinct count in large and small data streams.
HyperLogLog++ algorithm has been integrated with Apache Beam with help of ZetaSketch library, which is comply with Google Cloud BigQuery sketches. More about Apache beam integration can find here.
HyperLogLog++ Integration.
Scio distinct count API has been extended to support HyperLogLog algorithms using ApproxDistinctCount Interface. scio-extra
module provide two different implementations of this interface,
- com.spotify.scio.extra.hll.sketching.SketchHllPlusPlus
- com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus
SketchHllPlusPlus
provide HyperLogLog++ implementation based on Addthis’ Stream-lib library while ZetaSketchHllPlusPlus
provide implementation basedon ZetaSketch.
Following is how you can use the new API.
In this example, we count the distinct value of a given type using SketchHllPlusPlus
implementation.
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.sketching.SketchHllPlusPlus
def input[T]: SCollection[T] = ???
def distinctCount: SCollection[Long] =
input
.countApproxDistinct(new SketchHllPlusPlus(15, 20))
Same thing can be done using ZetaSketchHllPlusPlus
too, but this only support Int
, Long
, String
and Array[Byte]
types only. Meaning you can only call this on SCollection
of about supported types.
import com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus
val estimator = ZetaSketchHllPlusPlus[Int]()
def distinctCount: SCollection[Long] = input.countApproxDistinct(estimator)
Scio support the same for key-value SCollection
too, in this case it will output distinct count per each unique key in the input data stream.
def kvInput[K, V]: SCollection[(K, V)] = ???
def distinctCount[K]: SCollection[(K, Long)] =
kvInput
.countApproxDistinctByKey(new SketchHllPlusPlus[Int](15, 20))
Same thing with ZetaSketchHllPlusPlus
def distinctCount[K]: SCollection[(K, Long)] =
kvInput[K, Int] // K could be any type, value should be one of ZetaSketch supported types.
.countApproxDistinctByKey(estimator)
Distributed HyperLogLog++
Both above implementation estimate distinct count for the whole data stream and doesn’t expose the underline sketch to the user. Scio exposed the underline sketch to the user and make it possible to run this Hll++ algorithm in distributed way using the ZetaSketch library’s internal APIs. Since, ZetaSketch is comply with Gooogle Cloud BigQuery, you can use BigQuery generated sketches with sketches exposed by this API.
import com.spotify.scio.extra.hll.zetasketch._
implicit def hllPlus[T]: HllPlus[T] = ???
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.zetasketch._
def input[T]: SCollection[T] = ???
// first convert each element to ZetaSketchHll.
def zCol[T]: SCollection[ZetaSketchHll[T]] = input.asZetaSketchHll
// then combine all ZeatSketchHll and count the distinct.
def approxDistCount: SCollection[Long] = zCol.sumHll.approxDistinctCount
or in simpler way you can do:
def approxDistCount: SCollection[Long] = input.approxDistinctCountWithZetaHll
Note: This supports Int
, Long
, String
and ByteString
input types only.
Similarly, for key-value SCollections.
def kvInput[K, V]: SCollection[(K, V)] = ??? // Here type V should be one of supported type. `Int`, `Long`, `String` or `ByteString`
def zCol[K, V]: SCollection[(K, ZetaSketchHll[V])] = kvInput.asZetaSketchHllByKey
def approxDistCount[K]: SCollection[(K, Long)] = zCol.sumHllByKey.approxDistinctCountByKey
or
def approxDistCount[K]: SCollection[(K, Long)] = kvInput.approxDistinctCountWithZetaHllByKey
NOTE: ZetaSketchLL[T]
has algebird
’s Monoid
and Aggregator
implementations. Use it by importing com.spotify.scio.extra.hll.zetasketch.ZetaSketchHll._
to the scope.