Input is a collection of (user, item, score)

package com.spotify.bdrc.pipeline import com.spotify.bdrc.util.Records.Rating import com.spotify.scio.values.SCollection import com.twitter.scalding.TypedPipe import org.apache.spark.rdd.RDD object MinItemPerUser {  

Scalding

def scalding(input: TypedPipe[Rating]): TypedPipe[Rating] = { input .groupBy(_.user

Reduce items per key by picking the side with lower score for each pair of input

.reduce((x, y) => if (x.score < y.score) x else y) .values }  

Scalding with Algebird Aggregator

def scaldingWithAlgebird(input: TypedPipe[Rating]): TypedPipe[Rating] = { import com.twitter.algebird.Aggregator.minBy input .groupBy(_.user

Aggregate per key into a single Rating based on Double value via _.score

.aggregate(minBy(_.score)) .values }  

Scio

def scio(input: SCollection[Rating]): SCollection[Rating] = { input .keyBy(_.user

Compute top one item per key as an Iterable[Rating] with a reverse comparator

.topByKey(1)(Ordering.by(-_.score)) 

Drop user key

.values 

Flatten result Iterable[Rating]

.flatten }  

Scio with Algebird Aggregator

def scioWithAlgebird(input: SCollection[Rating]): SCollection[Rating] = { import com.twitter.algebird.Aggregator.minBy input .keyBy(_.user

Aggregate per key into a single Rating based on Double value via _.score. Explicit type due to type inference limitation.

.aggregateByKey(minBy { x: Rating => x.score }) .values }  

Spark

def spark(input: RDD[Rating]): RDD[Rating] = { input .keyBy(_.user

Reduce items per key by picking the side with lower score for each pair of input

.reduceByKey((x, y) => if (x.score < y.score) x else y) .values }  

Spark with Algebird Aggregator

def sparkWithAlgebird(input: RDD[Rating]): RDD[Rating] = { import com.twitter.algebird.Aggregator.minBy import com.twitter.algebird.spark._ input .keyBy(_.user) .algebird 

Aggregate per key into a single Rating based on Double value via _.score. Explicit type due to type inference limitation.

.aggregateByKey(minBy { x: Rating => x.score }) .values }  

Spark with MLLib

def sparkWithMllib(input: RDD[Rating]): RDD[Rating] = { import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ input .keyBy(_.user

From spark-mllib, compute top K per key with a priority queue and a reverse comparator

.topByKey(1)(Ordering.by(-_.score)) .flatMap(_._2) } }