package sparkey
Main package for Sparkey side input APIs. Import all.
import com.spotify.scio.extra.sparkey._
To save an SCollection[(String, String)]
to a Sparkey fileset:
val s = sc.parallelize(Seq("a" -> "one", "b" -> "two")) s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-prefix>") // with multiple shards, sharded by MurmurHash3 of the key s.saveAsSparkey("gs://<bucket>/<path>/<sparkey-dir>", numShards=2)
A previously-saved sparkey can be loaded as a side input:
sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>")
A sharded collection of Sparkey files can also be used as a side input by specifying a glob path:
sc.sparkeySideInput("gs://<bucket>/<path>/<sparkey-dir>/part-*")
When the sparkey is needed only temporarily, the save step can be elided:
val side: SideInput[SparkeyReader] = sc .parallelize(Seq("a" -> "one", "b" -> "two")) .asSparkeySideInput
SparkeyReader
can be used like a lookup table in a side input operation:
val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val side: SideInput[SparkeyReader] = sc .parallelize(Seq("a" -> "one", "b" -> "two")) .asSparkeySideInput main.withSideInputs(side) .map { (x, s) => s(side).getOrElse(x, "unknown") }
A SparkeyMap
can store any types of keys and values, but can only be used as a SideInput:
val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val side: SideInput[SparkeyMap[String, Int]] = sc .parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3)) .asLargeMapSideInput() val objects: SCollection[MyObject] = main .withSideInputs(side) .map { (x, s) => s(side).get(x) } .toSCollection
To read a static Sparkey collection and use it as a typed SideInput, use TypedSparkeyReader
.
TypedSparkeyReader
can also accept a Caffeine cache to reduce IO and deserialization load:
val main: SCollection[String] = sc.parallelize(Seq("a", "b", "c")) val cache: Cache[String, MyObject] = ... val side: SideInput[TypedSparkeyReader[MyObject]] = sc .typedSparkeySideInput("gs://<bucket>/<path>/<sparkey-prefix>", MyObject.decode, cache) val objects: SCollection[MyObject] = main .withSideInputs(side) .map { (x, s) => s(side).get(x) } .toSCollection
- Source
- package.scala
- Alphabetic
- By Inheritance
- sparkey
- SparkeyCoders
- CoderGrammar
- SparkeyReaderInstances
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- case class InvalidNumShardsException(str: String) extends RuntimeException with Product with Serializable
- class LargeHashSCollectionFunctions[T] extends AnyRef
Extra functions available on SCollections for Sparkey hash-based filtering.
- class PairLargeHashSCollectionFunctions[K, V] extends AnyRef
Extra functions available on SCollections of (key, value) pairs for hash based joins through an implicit conversion, using the Sparkey-backed LargeMapSideInput for dramatic speed increases over the in-memory versions for datasets >100MB.
Extra functions available on SCollections of (key, value) pairs for hash based joins through an implicit conversion, using the Sparkey-backed LargeMapSideInput for dramatic speed increases over the in-memory versions for datasets >100MB. As long as the RHS fits on disk, these functions are usually much much faster than regular joins and save on shuffling.
Note that these are nearly identical to the functions in PairHashSCollectionFunctions.scala, but we can't reuse the implementations there as SideInput[T] is not covariant over T.
- implicit class SparkeyPairSCollection[K, V] extends Serializable
Enhanced version of SCollection with Sparkey methods.
- implicit final class SparkeySCollection extends AnyVal
Enhanced version of SCollection with Sparkey methods.
- implicit final class SparkeyScioContext extends AnyVal
Enhanced version of ScioContext with Sparkey methods.
- implicit class SparkeySetSCollection[T] extends AnyRef
Enhanced version of SCollection with Sparkey methods.
- case class SparkeyUri(path: String) extends Product with Serializable
Represents the base URI for a Sparkey index and log file, either on the local or a remote file system.
Represents the base URI for a Sparkey index and log file, either on the local or a remote file system. For remote file systems,
basePath
should be in the form 'scheme://<bucket>/<path>/<sparkey-prefix>'. For local files, it should be in the form '/<path>/<sparkey-prefix>'. Note thatbasePath
must not be a folder or GCS bucket as it is a base path representing two files - <sparkey-prefix>.spi and <sparkey-prefix>.spl. - sealed trait SparkeyWritable[K, V] extends Serializable
Value Members
- implicit val ByteArraySparkeyWritable: SparkeyWritable[Array[Byte], Array[Byte]]
- def aggregate[T](implicit arg0: Coder[T]): Coder[Iterable[T]]
- Definition Classes
- CoderGrammar
- def beam[T](beam: Coder[T]): Coder[T]
- Definition Classes
- CoderGrammar
- def disjunction[T, Id](typeName: String, coder: Map[Id, Coder[T]])(id: (T) => Id)(implicit arg0: Coder[Id]): Coder[T]
- Definition Classes
- CoderGrammar
- def kryo[T](implicit ct: ClassTag[T]): Coder[T]
Create an instance of Kryo Coder for a given Type.
Create an instance of Kryo Coder for a given Type.
Eg: A kryo Coder for org.joda.time.Interval would look like:
implicit def jiKryo: Coder[Interval] = Coder.kryo[Interval]
- Definition Classes
- CoderGrammar
- def kv[K, V](koder: Coder[K], voder: Coder[V]): Coder[KV[K, V]]
- Definition Classes
- CoderGrammar
- implicit def makeLargeHashSCollectionFunctions[T](s: SCollection[T]): LargeHashSCollectionFunctions[T]
- implicit def makePairLargeHashSCollectionFunctions[K, V](s: SCollection[(K, V)]): PairLargeHashSCollectionFunctions[K, V]
- def raw[T](beam: Coder[T]): Coder[T]
Create a ScioCoder from a Beam Coder
Create a ScioCoder from a Beam Coder
- Definition Classes
- CoderGrammar
- implicit def sparkeyReaderCoder[T <: SparkeyReader](implicit arg0: ClassTag[T]): Coder[T]
- Definition Classes
- SparkeyCoders
- implicit def sparkeyWriterCoder[T <: SparkeyWriter](implicit arg0: ClassTag[T]): Coder[T]
- Definition Classes
- SparkeyCoders
- implicit def stringSparkeyReader(reader: SparkeyReader): StringSparkeyReader
- Definition Classes
- SparkeyReaderInstances
- implicit val stringSparkeyWritable: SparkeyWritable[String, String]
- def transform[U, T](c: Coder[U])(f: (Coder[U]) => Coder[T])(implicit ct: ClassTag[T]): Coder[T]
- Definition Classes
- CoderGrammar
- def xmap[U, T](c: Coder[U])(f: (U) => T, t: (T) => U)(implicit ct: ClassTag[T]): Coder[T]
Given a Coder[A], create a Coder[B] by defining two functions A => B and B => A.
Given a Coder[A], create a Coder[B] by defining two functions A => B and B => A. The Coder[A] can be resolved implicitly by calling Coder[A]
Eg: Coder for org.joda.time.Interval can be defined by having the following implicit in scope. Without this implicit in scope Coder derivation falls back to Kryo.
implicit def jiCoder: Coder[Interval] = Coder.xmap(Coder[(Long, Long)])(t => new Interval(t._1, t._2), i => (i.getStartMillis, i.getEndMillis))
In the above example we implicitly derive Coder[(Long, Long)] and we define two functions, one to convert a tuple (Long, Long) to Interval, and a second one to convert an Interval to a tuple of (Long, Long)
- Definition Classes
- CoderGrammar
- object SparkeyIO
- object SparkeyUri extends Serializable