Input is a collection of (user, item, score)

package com.spotify.bdrc.pipeline import com.spotify.bdrc.util.Records.Rating import com.spotify.scio.values.SCollection import com.twitter.scalding.TypedPipe import org.apache.spark.rdd.RDD object CountUsers {  

Scalding

def scalding(input: TypedPipe[Rating]): TypedPipe[Long] = { input .filter(_.user == "Smith") .map(_ => 1L

Sum with an implicit Semigroup[Long]

.sum .toTypedPipe }  

Sclading with Algebird Aggregator

def scaldingWithAlgebird(input: TypedPipe[Rating]): TypedPipe[Long] = { import com.twitter.algebird.Aggregator.count input 

Aggregate globally into a single Long

.aggregate(count(_.user == "Smith")) .toTypedPipe } def scio(input: SCollection[Rating]): SCollection[Long] = { input .filter(_.user == "Smith") .count }  

Scio with Algebird Aggregator

def scioWithAlgebird(input: SCollection[Rating]): SCollection[Long] = { import com.twitter.algebird.Aggregator.count input 

Aggregate globally into a single Long

.aggregate(count((_: Rating).user == "Smith")) }  

Spark

def spark(input: RDD[Rating]): Long = { input .filter(_.user == "Smith"

count is an action and collects data back to the driver node

.count() }  

Spark with Algebird Aggregator

def sparkWithAlgebird(input: RDD[Rating]): Long = { import com.twitter.algebird.Aggregator.count import com.twitter.algebird.spark._ input.algebird 

aggregate is an action and collects data back to the driver node

.aggregate(count(_.user == "Smith")) } }