Packages

  • package root
    Definition Classes
    root
  • package com
    Definition Classes
    root
  • package spotify
    Definition Classes
    com
  • package scio
    Definition Classes
    spotify
  • package extra
    Definition Classes
    scio
  • package annoy

    Main package for Annoy side input APIs.

    Main package for Annoy side input APIs. Import all.

    import com.spotify.scio.extra.annoy._

    Two metrics are available, Angular and Euclidean.

    To save an SCollection[(Int, Array[Float])] to an Annoy file:

    val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))

    Save to a temporary location:

    val s1 = s.asAnnoy(Angular, 40, 10)

    Save to a specific location:

    val s1 = s.asAnnoy(Angular, 40, 10, "gs://<bucket>/<path>")

    SCollection[AnnoyUri] can be converted into a side input:

    val s = sc.parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))
    val side = s.asAnnoySideInput(metric, dimension, numTrees)

    There's syntactic sugar for saving an SCollection and converting it to a side input:

    val s = sc
      .parallelize(Seq( 1-> Array(1.2f, 3.4f), 2 -> Array(2.2f, 1.2f)))
      .asAnnoySideInput(metric, dimension, numTrees)

    An existing Annoy file can be converted to a side input directly:

    sc.annoySideInput(metric, dimension, numTrees, "gs://<bucket>/<path>")

    AnnoyReader provides nearest neighbor lookups by vector as well as item lookups:

    val data = (0 until 1000).map(x => (x, Array.fill(40)(r.nextFloat())))
    val main = sc.parallelize(data)
    val side = main.asAnnoySideInput(metric, dimension, numTrees)
    
    main.keys.withSideInput(side)
      .map { (i, s) =>
        val annoyReader = s(side)
    
        // get vector by item id, allocating a new Array[Float] each time
        val v1 = annoyReader.getItemVector(i)
    
        // get vector by item id, copy vector into pre-allocated Array[Float]
        val v2 = Array.fill(dim)(-1.0f)
        annoyReader.getItemVector(i, v2)
    
        // get 10 nearest neighbors by vector
        val results = annoyReader.getNearest(v2, 10)
      }
    Definition Classes
    extra
  • package bigquery
    Definition Classes
    extra
  • package csv

    Main package for CSV type-safe APIs.

    Main package for CSV type-safe APIs. Import all.

    import com.spotify.scio.extra.csv._
    Definition Classes
    extra
  • package hll
    Definition Classes
    extra
  • package json

    Main package for JSON APIs.

    Main package for JSON APIs. Import all.

    This package uses Circe for JSON handling under the hood.

    import com.spotify.scio.extra.json._
    
    // define a type-safe JSON schema
    case class Record(i: Int, d: Double, s: String)
    
    // read JSON as case classes
    sc.jsonFile[Record]("input.json")
    
    // write case classes as JSON
    sc.parallelize((1 to 10).map(x => Record(x, x.toDouble, x.toString))
      .saveAsJsonFile("output")
    Definition Classes
    extra
  • package rollup
    Definition Classes
    extra
  • package sorter
    Definition Classes
    extra
  • package sparkey

    Main package for Sparkey side input APIs.

    Main package for Sparkey side input APIs. Import all.

    import com.spotify.scio.extra.sparkey._

    To save an SCollection[(String, String)] to a Sparkey fileset:

    val s = sc.parallelize(Seq("a" -> "one", "b" -> "two"))
    s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-prefix>")
    
    // with multiple shards, sharded by MurmurHash3 of the key
    s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)

    A previously-saved sparkey can be loaded as a side input:

    sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>")

    A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:

    sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-dir>/part-*")

    When the sparkey is needed only temporarily, the save step can be elided:

    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))
      .asSparkeySideInput

    SparkeyReader can be used like a lookup table in a side input operation:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))
      .asSparkeySideInput
    
    main.withSideInputs(side)
      .map { (x, s) =>
        s(side).getOrElse(x, "unknown")
      }

    A SparkeyMap can store any types of keys and values, but can only be used as a SideInput:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyMap[String, Int]] = sc
      .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3))
      .asLargeMapSideInput()
    
    val objects: SCollection[MyObject] = main
      .withSideInputs(side)
      .map { (x, s) => s(side).get(x) }
      .toSCollection

    To read a static Sparkey collection and use it as a typed SideInput, use TypedSparkeyReader. TypedSparkeyReader can also accept a Caffeine cache to reduce IO and deserialization load:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val cache: Cache[String, MyObject] = ...
    val side: SideInput[TypedSparkeyReader[MyObject]] = sc
      .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache)
    
    val objects: SCollection[MyObject] = main
      .withSideInputs(side)
      .map { (x, s) => s(side).get(x) }
      .toSCollection
    Definition Classes
    extra
  • package instances
  • InvalidNumShardsException
  • LargeHashSCollectionFunctions
  • PairLargeHashSCollectionFunctions
  • SparkeyIO
  • SparkeyPairSCollection
  • SparkeySCollection
  • SparkeyScioContext
  • SparkeySetSCollection
  • SparkeyUri
  • SparkeyWritable
  • package voyager

    Main package for Voyager side input APIs.

    Main package for Voyager side input APIs.

    Definition Classes
    extra

package sparkey

Main package for Sparkey side input APIs. Import all.

import com.spotify.scio.extra.sparkey._

To save an SCollection[(String, String)] to a Sparkey fileset:

val s = sc.parallelize(Seq("a" -> "one", "b" -> "two"))
s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-prefix>")

// with multiple shards, sharded by MurmurHash3 of the key
s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)

A previously-saved sparkey can be loaded as a side input:

sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>")

A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:

sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-dir>/part-*")

When the sparkey is needed only temporarily, the save step can be elided:

val side: SideInput[SparkeyReader] = sc
  .parallelize(Seq("a" -> "one", "b" -> "two"))
  .asSparkeySideInput

SparkeyReader can be used like a lookup table in a side input operation:

val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
val side: SideInput[SparkeyReader] = sc
  .parallelize(Seq("a" -> "one", "b" -> "two"))
  .asSparkeySideInput

main.withSideInputs(side)
  .map { (x, s) =>
    s(side).getOrElse(x, "unknown")
  }

A SparkeyMap can store any types of keys and values, but can only be used as a SideInput:

val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
val side: SideInput[SparkeyMap[String, Int]] = sc
  .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3))
  .asLargeMapSideInput()

val objects: SCollection[MyObject] = main
  .withSideInputs(side)
  .map { (x, s) => s(side).get(x) }
  .toSCollection

To read a static Sparkey collection and use it as a typed SideInput, use TypedSparkeyReader. TypedSparkeyReader can also accept a Caffeine cache to reduce IO and deserialization load:

val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
val cache: Cache[String, MyObject] = ...
val side: SideInput[TypedSparkeyReader[MyObject]] = sc
  .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache)

val objects: SCollection[MyObject] = main
  .withSideInputs(side)
  .map { (x, s) => s(side).get(x) }
  .toSCollection
Source
package.scala
Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. sparkey
  2. SparkeyCoders
  3. CoderGrammar
  4. SparkeyReaderInstances
  5. AnyRef
  6. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Package Members

  1. package instances

Type Members

  1. case class InvalidNumShardsException(str: String) extends RuntimeException with Product with Serializable
  2. class LargeHashSCollectionFunctions[T] extends AnyRef

    Extra functions available on SCollections for Sparkey hash-based filtering.

  3. class PairLargeHashSCollectionFunctions[K, V] extends AnyRef

    Extra functions available on SCollections of (key, value) pairs for hash based joins through an implicit conversion, using the Sparkey-backed LargeMapSideInput for dramatic speed increases over the in-memory versions for datasets >100MB.

    Extra functions available on SCollections of (key, value) pairs for hash based joins through an implicit conversion, using the Sparkey-backed LargeMapSideInput for dramatic speed increases over the in-memory versions for datasets >100MB. As long as the RHS fits on disk, these functions are usually much much faster than regular joins and save on shuffling.

    Note that these are nearly identical to the functions in PairHashSCollectionFunctions.scala, but we can't reuse the implementations there as SideInput[T] is not covariant over T.

  4. implicit class SparkeyPairSCollection[K, V] extends Serializable

    Enhanced version of SCollection with Sparkey methods.

  5. implicit final class SparkeySCollection extends AnyVal

    Enhanced version of SCollection with Sparkey methods.

  6. implicit final class SparkeyScioContext extends AnyVal

    Enhanced version of ScioContext with Sparkey methods.

  7. implicit class SparkeySetSCollection[T] extends AnyRef

    Enhanced version of SCollection with Sparkey methods.

  8. case class SparkeyUri(path: String) extends Product with Serializable

    Represents the base URI for a Sparkey index and log file, either on the local or a remote file system.

    Represents the base URI for a Sparkey index and log file, either on the local or a remote file system. For remote file systems, basePath should be in the form 'scheme://<bucket>/<path>/<sparkey-prefix>'. For local files, it should be in the form '/<path>/<sparkey-prefix>'. Note that basePath must not be a folder or GCS bucket as it is a base path representing two files - <sparkey-prefix>.spi and <sparkey-prefix>.spl.

  9. sealed trait SparkeyWritable[K, V] extends Serializable

Value Members

  1. implicit val ByteArraySparkeyWritable: SparkeyWritable[Array[Byte], Array[Byte]]
  2. def aggregate[T](implicit arg0: Coder[T]): Coder[Iterable[T]]
    Definition Classes
    CoderGrammar
  3. def beam[T](beam: Coder[T]): Coder[T]
    Definition Classes
    CoderGrammar
  4. def disjunction[T, Id](typeName: String, coder: Map[Id, Coder[T]])(id: (T) => Id)(implicit arg0: Coder[Id]): Coder[T]
    Definition Classes
    CoderGrammar
  5. def kryo[T](implicit ct: ClassTag[T]): Coder[T]

    Create an instance of Kryo Coder for a given Type.

    Create an instance of Kryo Coder for a given Type.

    Eg: A kryo Coder for org.joda.time.Interval would look like:

    implicit def jiKryo: Coder[Interval] = Coder.kryo[Interval]
    Definition Classes
    CoderGrammar
  6. def kv[K, V](koder: Coder[K], voder: Coder[V]): Coder[KV[K, V]]
    Definition Classes
    CoderGrammar
  7. implicit def makeLargeHashSCollectionFunctions[T](s: SCollection[T]): LargeHashSCollectionFunctions[T]
  8. implicit def makePairLargeHashSCollectionFunctions[K, V](s: SCollection[(K, V)]): PairLargeHashSCollectionFunctions[K, V]
  9. def raw[T](beam: Coder[T]): Coder[T]

    Create a ScioCoder from a Beam Coder

    Create a ScioCoder from a Beam Coder

    Definition Classes
    CoderGrammar
  10. implicit def sparkeyReaderCoder[T <: SparkeyReader](implicit arg0: ClassTag[T]): Coder[T]
    Definition Classes
    SparkeyCoders
  11. implicit def sparkeyWriterCoder[T <: SparkeyWriter](implicit arg0: ClassTag[T]): Coder[T]
    Definition Classes
    SparkeyCoders
  12. implicit def stringSparkeyReader(reader: SparkeyReader): StringSparkeyReader
    Definition Classes
    SparkeyReaderInstances
  13. implicit val stringSparkeyWritable: SparkeyWritable[String, String]
  14. def transform[U, T](c: Coder[U])(f: (Coder[U]) => Coder[T])(implicit ct: ClassTag[T]): Coder[T]
    Definition Classes
    CoderGrammar
  15. def xmap[U, T](c: Coder[U])(f: (U) => T, t: (T) => U)(implicit ct: ClassTag[T]): Coder[T]

    Given a Coder[A], create a Coder[B] by defining two functions A => B and B => A.

    Given a Coder[A], create a Coder[B] by defining two functions A => B and B => A. The Coder[A] can be resolved implicitly by calling Coder[A]

    Eg: Coder for org.joda.time.Interval can be defined by having the following implicit in scope. Without this implicit in scope Coder derivation falls back to Kryo.

    implicit def jiCoder: Coder[Interval] =
      Coder.xmap(Coder[(Long, Long)])(t => new Interval(t._1, t._2),
          i => (i.getStartMillis, i.getEndMillis))

    In the above example we implicitly derive Coder[(Long, Long)] and we define two functions, one to convert a tuple (Long, Long) to Interval, and a second one to convert an Interval to a tuple of (Long, Long)

    Definition Classes
    CoderGrammar
  16. object SparkeyIO
  17. object SparkeyUri extends Serializable

Inherited from SparkeyCoders

Inherited from CoderGrammar

Inherited from SparkeyReaderInstances

Inherited from AnyRef

Inherited from Any

Ungrouped