
  • package root
    Definition Classes
  • package com
    Definition Classes
  • package spotify
    Definition Classes
  • package scio
    Definition Classes
  • package extra
    Definition Classes
  • package sparkey

    Main package for Sparkey side input APIs.

    Main package for Sparkey side input APIs. Import all.

    import com.spotify.scio.extra.sparkey._

    To save an SCollection[(String, String)] to a Sparkey fileset:

    val s = sc.parallelize(Seq("a" -> "one", "b" -> "two"))
    // with multiple shards, sharded by MurmurHash3 of the key
    s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)

    A previously-saved sparkey can be loaded as a side input:


    A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:


    When the sparkey is needed only temporarily, the save step can be elided:

    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))

    SparkeyReader can be used like a lookup table in a side input operation:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyReader] = sc
      .parallelize(Seq("a" -> "one", "b" -> "two"))
      .map { (x, s) =>
        s(side).getOrElse(x, "unknown")

    A SparkeyMap can store any types of keys and values, but can only be used as a SideInput:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val side: SideInput[SparkeyMap[String, Int]] = sc
      .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3))
    val objects: SCollection[MyObject] = main
      .map { (x, s) => s(side).get(x) }

    To read a static Sparkey collection and use it as a typed SideInput, use TypedSparkeyReader. TypedSparkeyReader can also accept a Caffeine cache to reduce IO and deserialization load:

    val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c"))
    val cache: Cache[String, MyObject] = ...
    val side: SideInput[TypedSparkeyReader[MyObject]] = sc
      .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache)
    val objects: SCollection[MyObject] = main
      .map { (x, s) => s(side).get(x) }
    Definition Classes
  • package instances
    Definition Classes
  • InvalidNumShardsException
  • LargeHashSCollectionFunctions
  • PairLargeHashSCollectionFunctions
  • SparkeyIO
  • SparkeyPairSCollection
  • SparkeySCollection
  • SparkeyScioContext
  • SparkeySetSCollection
  • SparkeyUri
  • SparkeyWritable



implicit class SparkeyPairSCollection[K, V] extends Serializable

Enhanced version of SCollection with Sparkey methods.

Linear Supertypes
  1. Alphabetic
  2. By Inheritance
  1. SparkeyPairSCollection
  2. Serializable
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
  1. Public
  2. Protected

Instance Constructors

  1. new SparkeyPairSCollection(self: SCollection[(K, V)])

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. def asCachedStringSparkeySideInput(cache: Cache[String, String], numShards: Short = SparkeyIO.DefaultSideInputNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize)(implicit w: SparkeyWritable[K, V]): SideInput[CachedStringSparkeyReader]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a CachedStringSparkeyReader, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a CachedStringSparkeyReader, to be used with SCollection.withSideInputs.

  5. final def asInstanceOf[T0]: T0
    Definition Classes
  6. def asLargeMapSideInput(numShards: Short = SparkeyIO.DefaultSideInputNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize): SideInput[SparkeyMap[K, V]]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyMap, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyMap, to be used with SCollection.withSideInputs. It is required that each key of the input be associated with a single value. The resulting SideInput must fit on disk on each worker that reads it. This is strongly recommended over a regular MapSideInput if the data in the side input exceeds 100MB.

  7. def asLargeMapSideInput: SideInput[SparkeyMap[K, V]]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyMap, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyMap, to be used with SCollection.withSideInputs. It is required that each key of the input be associated with a single value. The resulting SideInput must fit on disk on each worker that reads it. This is strongly recommended over a regular MapSideInput if the data in the side input exceeds 100MB.

  8. def asLargeMultiMapSideInput(numShards: Short = SparkeyIO.DefaultSideInputNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize): SideInput[SparkeyMap[K, Iterable[V]]]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a Map[key, Iterable[value]], to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a Map[key, Iterable[value]], to be used with SCollection.withSideInputs. In contrast to asLargeMapSideInput, it is not required that the keys in the input collection be unique. The resulting map is required to fit on disk on each worker. This is strongly recommended over a regular MultiMapSideInput if the data in the side input exceeds 100MB.

  9. def asLargeMultiMapSideInput: SideInput[SparkeyMap[K, Iterable[V]]]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a Map[key, Iterable[value]], to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a Map[key, Iterable[value]], to be used with SCollection.withSideInputs. In contrast to asLargeMapSideInput, it is not required that the keys in the input collection be unique. The resulting map is required to fit on disk on each worker. This is strongly recommended over a regular MultiMapSideInput if the data in the side input exceeds 100MB.

  10. def asSparkeySideInput(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs. It is required that each key of the input be associated with a single value.

  11. def asSparkeySideInput(numShards: Short = SparkeyIO.DefaultSideInputNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize)(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs. It is required that each key of the input be associated with a single value.


    the number of shards to use when writing the Sparkey file(s).

  12. def asTypedSparkeySideInput[T](cache: Cache[String, T], numShards: Short = SparkeyIO.DefaultSideInputNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize)(decoder: (Array[Byte]) => T)(implicit w: SparkeyWritable[K, V]): SideInput[TypedSparkeyReader[T]]

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs.

    Convert this SCollection to a SideInput, mapping key-value pairs of each window to a SparkeyReader, to be used with SCollection.withSideInputs. It is required that each key of the input be associated with a single value. The provided Cache will be used to cache reads from the resulting SparkeyReader.

  13. def clone(): AnyRef
    Definition Classes
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  14. final def eq(arg0: AnyRef): Boolean
    Definition Classes
  15. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  16. def finalize(): Unit
    Definition Classes
  17. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
  18. def hashCode(): Int
    Definition Classes
    AnyRef → Any
  19. final def isInstanceOf[T0]: Boolean
    Definition Classes
  20. final def ne(arg0: AnyRef): Boolean
    Definition Classes
  21. final def notify(): Unit
    Definition Classes
  22. final def notifyAll(): Unit
    Definition Classes
  23. def saveAsSparkey(path: String, maxMemoryUsage: Long = -1, numShards: Short = SparkeyIO.DefaultNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize)(implicit writable: SparkeyWritable[K, V]): ClosedTap[Nothing]

    Write the key-value pairs of this SCollection as a Sparkey file to a specific location.

    Write the key-value pairs of this SCollection as a Sparkey file to a specific location.


    where to write the sparkey files


    (optional) how much memory (in bytes) is allowed for writing the index file


    (optional) the number of shards to split this dataset into before writing. One pair of Sparkey files will be written for each shard, sharded by MurmurHash3 of the key mod the number of shards.

  24. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
  25. def toString(): String
    Definition Classes
    AnyRef → Any
  26. final def wait(): Unit
    Definition Classes
  27. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
  28. final def wait(arg0: Long): Unit
    Definition Classes
    @throws(classOf[java.lang.InterruptedException]) @native()

Deprecated Value Members

  1. def asSparkey(implicit w: SparkeyWritable[K, V]): SCollection[SparkeyUri]

    Write the key-value pairs of this SCollection as a Sparkey file to a temporary location.

    Write the key-value pairs of this SCollection as a Sparkey file to a temporary location.


    A singleton SCollection containing the SparkeyUri of the saved files.


    (Since version 0.14.0) Use saveAsSparkey instead

  2. def asSparkey(path: String = null, maxMemoryUsage: Long = -1, numShards: Short = SparkeyIO.DefaultNumShards, compressionType: CompressionType = SparkeyIO.DefaultCompressionType, compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize)(implicit w: SparkeyWritable[K, V]): SCollection[SparkeyUri]

    Write the key-value pairs of this SCollection as a Sparkey file to a specific location.

    Write the key-value pairs of this SCollection as a Sparkey file to a specific location.


    where to write the sparkey files. Defaults to a temporary location.


    (optional) how much memory (in bytes) is allowed for writing the index file


    (optional) the number of shards to split this dataset into before writing. One pair of Sparkey files will be written for each shard, sharded by MurmurHash3 of the key mod the number of shards.


    A singleton SCollection containing the SparkeyUri of the saved files.


    (Since version 0.14.0) Use saveAsSparkey instead

Inherited from Serializable

Inherited from AnyRef

Inherited from Any
