
  • package root
    Definition Classes
  • package com
    Definition Classes
  • package spotify
    Definition Classes
  • package scio
    Definition Classes
  • package extra
    Definition Classes
  • package annoy

    Main package for Annoy side input APIs.

    Main package for Annoy side input APIs. Import all.

    import com.spotify.scio.extra.annoy._

    Two metrics are available, Angular and Euclidean.

    To save an SCollection[(Int, Array[Float])] to an Annoy file:

    val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))

    Save to a temporary location:

    val s1 = s.asAnnoy(Angular, 40, 10)

    Save to a specific location:

    val s1 = s.asAnnoy(Angular, 40, 10, "gs://<bucket>/<path>")

    SCollection[AnnoyUri] can be converted into a side input:

    val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))
    val side = s.asAnnoySideInput(metric, dimension, numTrees)

    There's syntactic sugar for saving an SCollection and converting it to a side input:

    val s = sc
      .parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))
      .asAnnoySideInput(metric, dimension, numTrees)

    An existing Annoy file can be converted to a side input directly:

    sc.annoySideInput(metric, dimension, numTrees, "gs://<bucket>/<path>")

    AnnoyReader provides nearest neighbor lookups by vector as well as item lookups:

    val data = (0 until 1000).map(x => (x, Array.fill(40)(r.nextFloat())))
    val main = sc.parallelize(data)
    val side = main.asAnnoySideInput(metric, dimension, numTrees)
      .map { (i, s) =>
        val annoyReader = s(side)
        // get vector by item id, allocating a new Array[Float] each time
        val v1 = annoyReader.getItemVector(i)
        // get vector by item id, copy vector into pre-allocated Array[Float]
        val v2 = Array.fill(dim)(-1.0f)
        annoyReader.getItemVector(i, v2)
        // get 10 nearest neighbors by vector
        val results = annoyReader.getNearest(v2, 10)
    Definition Classes
  • package bigquery
    Definition Classes
  • package csv

    Main package for CSV type-safe APIs.

    Main package for CSV type-safe APIs. Import all.

    import com.spotify.scio.extra.csv._
    Definition Classes
  • package hll
    Definition Classes
  • package json

    Main package for JSON APIs.

    Main package for JSON APIs. Import all.

    This package uses Circe for JSON handling under the hood.

    import com.spotify.scio.extra.json._
    // define a type-safe JSON schema
    case class Record(i: Int, d: Double, s: String)
    // read JSON as case classes
    // write case classes as JSON
    sc.parallelize((1 to 10).map(x => Record(x, x.toDouble, x.toString))
    Definition Classes
  • package rollup
    Definition Classes
  • package sorter
    Definition Classes
  • package syntax
  • SortingKey
  • package sparkey

    Main package for Sparkey side input APIs.

    Main package for Sparkey side input APIs. Import all.

    import com.spotify.scio.extra.sparkey._

    To save an SCollection[(String, String)] to a Sparkey fileset:

    val s = sc.parallelize(Seq("a" -> "one", "b" -> "two"))
    // with multiple shards, sharded by MurmurHash3 of the key
    s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)

    A previously-saved sparkey can be loaded as a side input:


    A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:


    When the sparkey is needed only temporarily, the save step can be elided:

    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))

    SparkeyReader can be used like a lookup table in a side input operation:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))
      .map { (x, s) =>
        s(side).getOrElse(x, "unknown")

    A SparkeyMap can store any types of keys and values, but can only be used as a SideInput:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyMap[String, Int]] = sc
      .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3))
    val objects: SCollection[MyObject] = main
      .map { (x, s) => s(side).get(x) }

    To read a static Sparkey collection and use it as a typed SideInput, use TypedSparkeyReader. TypedSparkeyReader can also accept a Caffeine cache to reduce IO and deserialization load:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val cache: Cache[String, MyObject] = ...
    val side: SideInput[TypedSparkeyReader[MyObject]] = sc
      .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache)
    val objects: SCollection[MyObject] = main
      .map { (x, s) => s(side).get(x) }
    Definition Classes
  • package voyager

    Main package for Voyager side input APIs.

    Main package for Voyager side input APIs.

    Definition Classes

package sorter

Linear Supertypes
  1. Alphabetic
  2. By Inheritance
  1. sorter
  2. SCollectionSyntax
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
  1. Public
  2. Protected

Package Members

  1. package syntax

Type Members

  1. sealed abstract class SortingKey[A] extends AnyRef

Value Members

  1. implicit def sorterOps[K1, K2, V](coll: SCollection[(K1, Iterable[(K2, V)])])(implicit arg0: SortingKey[K2]): SorterOps[K1, K2, V]
    Definition Classes
  2. object SortingKey

Inherited from SCollectionSyntax

Inherited from AnyRef

Inherited from Any
