c

org.apache.beam.sdk.extensions.smb

SortedBucketSink

class SortedBucketSink[K1, K2, V] extends PTransform[PCollection[V], WriteResult]

A PTransform for writing a PCollection to file-based sink, where files represent "buckets" of elements deterministically assigned by BucketMetadata based on a key extraction function. The elements in each bucket are written in sorted order according to the same key.

This transform is intended to be used in conjunction with the SortedBucketSource transform. Any two datasets written with SortedBucketSink using the same bucketing scheme can be joined by simply sequentially reading and merging files, thus eliminating the shuffle required by GroupByKey-based transforms. This is ideal for datasets that will be written once and read many times with a predictable join key, i.e. user event data.

Transform steps

SortedBucketSink maps over each element, extracts a byte[] representation of its sorting key using BucketMetadata#extractKeyPrimary(Object), and assigns it to an Integer bucket using BucketMetadata#getBucketId(byte[]). Next, a GroupByKey transform is applied to create a PCollection of N elements, where N is the number of buckets specified by BucketMetadata#getNumBuckets(), then a SortBucketShard transform is used to sort elements within each bucket group, optionally sorting by the secondary key bytes from BucketMetadata#getKeyBytesSecondary(Object). Finally, the write operation is performed, where each bucket is first written to a SortedBucketSink#tempDirectory and then copied to its final destination.

A JSON-serialized form of BucketMetadata is also written, which is required in order to join SortedBucketSinks using the SortedBucketSource transform.

Bucketing properties and hot keys

Bucketing properties are specified in BucketMetadata. The number of buckets, N, must be a power of two and should be chosen such that each bucket can fit in a worker node's memory. Note that the SortValues transform will try to sort in-memory and fall back to an ExternalSorter if needed.

Each bucket can be further sharded to reduce the impact of hot keys, by specifying BucketMetadata#getNumShards().

Source
SortedBucketSink.java
Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. SortedBucketSink
  2. PTransform
  3. HasDisplayData
  4. Serializable
  5. AnyRef
  6. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Instance Constructors

  1. new SortedBucketSink(bucketMetadata: BucketMetadata[K1, K2, V], outputDirectory: ResourceId, tempDirectory: ResourceId, filenameSuffix: String, fileOperations: FileOperations[V], sorterMemoryMb: Int, keyCacheSize: Int)
  2. new SortedBucketSink(bucketMetadata: BucketMetadata[K1, K2, V], outputDirectory: ResourceId, tempDirectory: ResourceId, filenameSuffix: String, fileOperations: FileOperations[V], sorterMemoryMb: Int)

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. def addAnnotation(annotationType: String, annotation: Array[Byte]): PTransform[PCollection[V], WriteResult]
    Definition Classes
    PTransform
  5. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  6. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  7. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  8. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  9. final def expand(input: PCollection[V]): WriteResult
    Definition Classes
    SortedBucketSink → PTransform
    Annotations
    @Override()
  10. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable])
  11. def getAdditionalInputs(): Map[TupleTag[_ <: AnyRef], PValue]
    Definition Classes
    PTransform
  12. def getAnnotations(): Map[String, Array[Byte]]
    Definition Classes
    PTransform
  13. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  14. def getKindString(): String
    Attributes
    protected[transforms]
    Definition Classes
    PTransform
  15. def getName(): String
    Definition Classes
    PTransform
  16. def getResourceHints(): ResourceHints
    Definition Classes
    PTransform
  17. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  18. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  19. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  20. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  21. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  22. def populateDisplayData(builder: Builder): Unit
    Definition Classes
    PTransform → HasDisplayData
  23. def setDisplayData(displayData: List[ItemSpec[_ <: AnyRef]]): PTransform[PCollection[V], WriteResult]
    Definition Classes
    PTransform
  24. def setResourceHints(resourceHints: ResourceHints): PTransform[PCollection[V], WriteResult]
    Definition Classes
    PTransform
  25. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  26. def toString(): String
    Definition Classes
    PTransform → AnyRef → Any
    Annotations
    @SideEffectFree()
  27. def validate(options: PipelineOptions, inputs: Map[TupleTag[_ <: AnyRef], PCollection[_ <: AnyRef]], outputs: Map[TupleTag[_ <: AnyRef], PCollection[_ <: AnyRef]]): Unit
    Definition Classes
    PTransform
  28. def validate(options: PipelineOptions): Unit
    Definition Classes
    PTransform
  29. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  30. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  31. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()

Deprecated Value Members

  1. def getDefaultOutputCoder[T <: AnyRef](input: PCollection[V], output: PCollection[T]): Coder[T]
    Definition Classes
    PTransform
    Annotations
    @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
    Deprecated
  2. def getDefaultOutputCoder(input: PCollection[V]): Coder[_ <: AnyRef]
    Attributes
    protected[transforms]
    Definition Classes
    PTransform
    Annotations
    @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
    Deprecated
  3. def getDefaultOutputCoder(): Coder[_ <: AnyRef]
    Attributes
    protected[transforms]
    Definition Classes
    PTransform
    Annotations
    @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
    Deprecated

Inherited from PTransform[PCollection[V], WriteResult]

Inherited from HasDisplayData

Inherited from Serializable

Inherited from AnyRef

Inherited from Any

Ungrouped