HyperLogLog is an algorithm to estimate the cardinality of a large dataset. Counting distinct in a large dataset requires linear space. Hll algorithm approximate the cardinality with a small memory footprint.

HyperLogLog++ is an improved version of HyperLogLog algorithms presented by Google. It more accurately estimates distinct count in large and small data streams.

HyperLogLog++ algorithm has been integrated with Apache Beam with help of ZetaSketch library, which is comply with Google Cloud BigQuery sketches. More about Apache beam integration can find here.

HyperLogLog++ Integration.

Scio distinct count API has been extended to support HyperLogLog algorithms using ApproxDistinctCount Interface. scio-extra module provide two different implementations of this interface,

  • com.spotify.scio.extra.hll.sketching.SketchHllPlusPlus
  • com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus

SketchHllPlusPlus provide HyperLogLog++ implementation based on Addthis’ Stream-lib library while ZetaSketchHllPlusPlus provide implementation basedon ZetaSketch.

Following is how you can use the new API.

In this example, we count the distinct value of a given type using SketchHllPlusPlus implementation.

import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.sketching.SketchHllPlusPlus

def input[T]: SCollection[T] = ???

def distinctCount: SCollection[Long] =
        .countApproxDistinct(new SketchHllPlusPlus(15, 20))

Same thing can be done using ZetaSketchHllPlusPlus too, but this only support Int, Long, String and Array[Byte] types only. Meaning you can only call this on SCollection of about supported types.

import com.spotify.scio.extra.hll.zetasketch.ZetaSketchHllPlusPlus

val estimator = ZetaSketchHllPlusPlus[Int]()

def distinctCount: SCollection[Long] = input.countApproxDistinct(estimator)

Scio support the same for key-value SCollection too, in this case it will output distinct count per each unique key in the input data stream.

def kvInput[K, V]: SCollection[(K, V)] = ???

def distinctCount[K]: SCollection[(K, Long)] =
    .countApproxDistinctByKey(new SketchHllPlusPlus[Int](15, 20))

Same thing with ZetaSketchHllPlusPlus

def distinctCount[K]: SCollection[(K, Long)] =
    kvInput[K, Int] // K could be any type, value should be one of ZetaSketch supported types.

Distributed HyperLogLog++

Both above implementation estimate distinct count for the whole data stream and doesn’t expose the underline sketch to the user. Scio exposed the underline sketch to the user and make it possible to run this Hll++ algorithm in distributed way using the ZetaSketch library’s internal APIs. Since, ZetaSketch is comply with Gooogle Cloud BigQuery, you can use BigQuery generated sketches with sketches exposed by this API.

import com.spotify.scio.extra.hll.zetasketch._

implicit def hllPlus[T]: HllPlus[T] = ???
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.hll.zetasketch._

def input[T]: SCollection[T] = ???
// first convert each element to ZetaSketchHll.
def zCol[T]: SCollection[ZetaSketchHll[T]] = input.asZetaSketchHll
// then combine all ZeatSketchHll and count the distinct.
def approxDistCount: SCollection[Long] = zCol.sumHll.approxDistinctCount

or in simpler way you can do:

def approxDistCount: SCollection[Long] = input.approxDistinctCountWithZetaHll

Note: This supports Int, Long, String and ByteString input types only.

Similarly, for key-value SCollections.

def kvInput[K, V]: SCollection[(K, V)] = ??? // Here type V should be one of supported type. `Int`, `Long`, `String` or `ByteString`

def zCol[K, V]: SCollection[(K, ZetaSketchHll[V])] = kvInput.asZetaSketchHllByKey

def approxDistCount[K]: SCollection[(K, Long)] = zCol.sumHllByKey.approxDistinctCountByKey


def approxDistCount[K]: SCollection[(K, Long)] = kvInput.approxDistinctCountWithZetaHllByKey

NOTE: ZetaSketchLL[T] has algebird’s Monoid and Aggregator implementations. Use it by importing com.spotify.scio.extra.hll.zetasketch.ZetaSketchHll._ to the scope.