Input is a collection of (user, item, score)

package com.spotify.bdrc.pipeline import com.spotify.bdrc.util.Records.Rating import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection import com.twitter.scalding.TypedPipe import org.apache.spark.rdd.RDD object Statistics { case class Stats(max: Double, min: Double, sum: Double, count: Long, mean: Double, stddev: Double) import com.twitter.algebird._ implicit val momentsCoder: Coder[Moments] = Coder.kryo[Moments]  

Algebird Aggregator

def aggregator = { 

Create 4 Aggregators with different logic

 

The first 3 are of type Aggregator[Rating, _, Double] which means it takes Rating as input and generates Double as output. The last one is of type Aggregator[Rating, _, Moments], where Moments include count, mean, standard deviation, etc. The input Rating is prepared with a Rating => Double function _.score.

val maxOp = Aggregator.max[Double].composePrepare[Rating](_.score) val minOp = Aggregator.min[Double].composePrepare[Rating](_.score) val sumOp = Aggregator.prepareMonoid[Rating, Double](_.score) val momentsOp = Moments.aggregator.composePrepare[Rating](_.score)  

Apply 4 Aggregators on the same input, present result tuple 4 of (Double, Double, Double, Moments) as Stats

MultiAggregator(maxOp, minOp, sumOp, momentsOp) .andThenPresent { case (max, min, sum, moments) => Stats(max, min, sum, moments.count, moments.mean, moments.stddev) } }  

Scalding

def scalding(input: TypedPipe[Rating]): TypedPipe[Stats] = input.aggregate(aggregator)  

Scio

def scio(input: SCollection[Rating]): SCollection[Stats] = { input .map(_.score) .stats .map(s => Stats(s.max, s.min, s.sum, s.count, s.mean, s.stdev)) }  

Scio with Algebird Aggregator

def scioAlgebird(input: SCollection[Rating]): SCollection[Stats] = input.aggregate(aggregator)  

Spark

def spark(input: RDD[Rating]): Stats = { val s = input.map(_.score).stats() Stats(s.max, s.min, s.sum, s.count, s.mean, s.stdev) }  

Spark with Algebird Aggregator

def sparkAlgebird(input: RDD[Rating]): Stats = { import com.twitter.algebird.spark._ input.algebird.aggregate(aggregator) } }