package extra
- Alphabetic
- Public
- Protected
Package Members
- package annoy
Main package for Annoy side input APIs.
Main package for Annoy side input APIs. Import all.
import com.spotify.scio.extra.annoy._
Two metrics are available, Angular and Euclidean.
To save an
SCollection[(Int, Array[Float])]
to an Annoy file:val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))
Save to a temporary location:
val s1 = s.asAnnoy(Angular, 40, 10)
Save to a specific location:
val s1 = s.asAnnoy(Angular, 40, 10, "gs://<bucket>/<path>")
SCollection[AnnoyUri]
can be converted into a side input:val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f))) val side = s.asAnnoySideInput(metric, dimension, numTrees)
There's syntactic sugar for saving an SCollection and converting it to a side input:
val s = sc .parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f))) .asAnnoySideInput(metric, dimension, numTrees)
An existing Annoy file can be converted to a side input directly:
sc.annoySideInput(metric, dimension, numTrees, "gs://<bucket>/<path>")
AnnoyReader
provides nearest neighbor lookups by vector as well as item lookups:val data = (0 until 1000).map(x => (x, Array.fill(40)(r.nextFloat()))) val main = sc.parallelize(data) val side = main.asAnnoySideInput(metric, dimension, numTrees) main.keys.withSideInput(side) .map { (i, s) => val annoyReader = s(side) // get vector by item id, allocating a new Array[Float] each time val v1 = annoyReader.getItemVector(i) // get vector by item id, copy vector into pre-allocated Array[Float] val v2 = Array.fill(dim)(-1.0f) annoyReader.getItemVector(i, v2) // get 10 nearest neighbors by vector val results = annoyReader.getNearest(v2, 10) }
- package bigquery
- package csv
Main package for CSV type-safe APIs.
Main package for CSV type-safe APIs. Import all.
import com.spotify.scio.extra.csv._
- package hll
- package json
Main package for JSON APIs.
Main package for JSON APIs. Import all.
This package uses Circe for JSON handling under the hood.
import com.spotify.scio.extra.json._ // define a type-safe JSON schema case class Record(i: Int, d: Double, s: String) // read JSON as case classes sc.jsonFile[Record]("input.json") // write case classes as JSON sc.parallelize((1 to 10).map(x => Record(x, x.toDouble, x.toString)) .saveAsJsonFile("output")
- package rollup
- package sorter
- package sparkey
Main package for Sparkey side input APIs.
Main package for Sparkey side input APIs. Import all.
import com.spotify.scio.extra.sparkey._
To save an
SCollection[(String, String)]
to a Sparkey fileset:val s = sc.parallelize(Seq("a" -> "one", "b" -> "two")) s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-prefix>") // with multiple shards, sharded by MurmurHash3 of the key s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)
A previously-saved sparkey can be loaded as a side input:
sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>")
A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:
sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-dir>/part-*")
When the sparkey is needed only temporarily, the save step can be elided:
val side: SideInput[SparkeyReader] = sc .parallelize(Seq("a" -> "one", "b" -> "two")) .asSparkeySideInput
SparkeyReader
can be used like a lookup table in a side input operation:val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val side: SideInput[SparkeyReader] = sc .parallelize(Seq("a" -> "one", "b" -> "two")) .asSparkeySideInput main.withSideInputs(side) .map { (x, s) => s(side).getOrElse(x, "unknown") }
A
SparkeyMap
can store any types of keys and values, but can only be used as a SideInput:val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val side: SideInput[SparkeyMap[String, Int]] = sc .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3)) .asLargeMapSideInput() val objects: SCollection[MyObject] = main .withSideInputs(side) .map { (x, s) => s(side).get(x) } .toSCollection
To read a static Sparkey collection and use it as a typed SideInput, use
TypedSparkeyReader
.TypedSparkeyReader
can also accept a Caffeine cache to reduce IO and deserialization load:val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val cache: Cache[String, MyObject] = ... val side: SideInput[TypedSparkeyReader[MyObject]] = sc .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache) val objects: SCollection[MyObject] = main .withSideInputs(side) .map { (x, s) => s(side).get(x) } .toSCollection
- package voyager
Main package for Voyager side input APIs.
Value Members
- object Breeze
Utilities for Breeze.
Utilities for Breeze.
Includes Semigroup s for Breeze data types like DenseVector s and DenseMatrix s.
import com.spotify.scio.extra.Breeze._ val vectors: SCollection[DenseVector[Double]] = // ... vectors.sum // implicit Semigroup[T]
- object Collections
Utilities for Scala collection library.
Utilities for Scala collection library.
Adds a
top
method toArray[T]
andIterable[T]
and atopByKey
method toArray[(K, V)]
andIterable[(K, V)]
.import com.spotify.scio.extra.Collections._ val xs: Array[(String, Int)] = // ... xs.top(5)(Ordering.by(_._2)) xs.topByKey(5)
- object Iterators
Utilities for Scala iterators.
Utilities for Scala iterators.
Adds a
timeSeries
method toIterator[T]
so that it can be windowed with different logic.import com.spotify.scio.extra.Iterators._ case class Event(user: String, action: String, timestamp: Long) val i: Iterator[Event] = // ... // 60 minutes fixed windows offset by 30 minutes // E.g. minute [30, 90), [90, 120), [120, 150), [150, 180) ... i.timeSeries(_.timestamp).fixed(3600000, 1800000) // session windows with 60 minute gaps between windows i.timeSeries(_.timestamp).session(3600000) // 60 minutes sliding windows, one every 10 minutes, offset by 5 minutes // E.g. minute [5, 65), [15, 75), [25, 85), [35, 95) ... i.timeSeries(_.timestamp).session(3600000, 600000, 300000)