Input is a collection of (user, item, score)

package com.spotify.bdrc.pipeline import com.spotify.bdrc.util.Records.Rating import com.spotify.scio.values.SCollection import com.twitter.scalding.TypedPipe import org.apache.spark.rdd.RDD object Count {  

Scalding

def scalding(input: TypedPipe[Rating]): TypedPipe[Long] = { input .map(_ => 1L

Sum with an implicit Semigroup[Long]

.sum .toTypedPipe }  

Scalding with Algebird Aggregator

def scaldingWithAlgebird(input: TypedPipe[Rating]): TypedPipe[Long] = { import com.twitter.algebird.Aggregator.size input .aggregate(size) .toTypedPipe }  

Scio

def scio(input: SCollection[Rating]): SCollection[Long] = input.count  

Scio with Algebird Aggregator

def scioWithAlgebird(input: SCollection[Rating]): SCollection[Long] = { import com.twitter.algebird.Aggregator.size input .aggregate(size) }  

Spark

def spark(input: RDD[Rating]): Long = { input 

count is an action and collects data back to the driver node

.count }  

Spark with Algebird Aggregator

def sparkWithAlgebird(input: RDD[Rating]): Long = { import com.twitter.algebird.Aggregator.size import com.twitter.algebird.spark._ input.algebird 

aggregate is an action and collects data back to the driver node

.aggregate(size) } }