class SortedBucketSink[K1, K2, V] extends PTransform[PCollection[V], WriteResult]
A PTransform
for writing a PCollection
to file-based sink, where files represent
"buckets" of elements deterministically assigned by BucketMetadata
based on a key
extraction function. The elements in each bucket are written in sorted order according to the
same key.
This transform is intended to be used in conjunction with the SortedBucketSource
transform. Any two datasets written with SortedBucketSink
using the same bucketing scheme
can be joined by simply sequentially reading and merging files, thus eliminating the shuffle
required by GroupByKey
-based transforms. This is ideal for datasets that will be written
once and read many times with a predictable join key, i.e. user event data.
Transform steps
SortedBucketSink
maps over each element, extracts a byte[]
representation of
its sorting key using BucketMetadata#extractKeyPrimary(Object)
, and assigns it to an
Integer bucket using BucketMetadata#getBucketId(byte[])
. Next, a GroupByKey
transform is applied to create a PCollection
of N
elements, where N
is
the number of buckets specified by BucketMetadata#getNumBuckets()
, then a
SortBucketShard
transform is used to sort elements within each bucket group, optionally sorting
by the secondary key bytes from BucketMetadata#getKeyBytesSecondary(Object)
. Finally, the
write operation is performed, where each bucket is first written to a SortedBucketSink#tempDirectory
and then copied to its final destination.
A JSON-serialized form of BucketMetadata
is also written, which is required in order
to join SortedBucketSink
s using the SortedBucketSource
transform.
Bucketing properties and hot keys
Bucketing properties are specified in BucketMetadata
. The number of buckets,
N
, must be a power of two and should be chosen such that each bucket can fit in a worker node's
memory. Note that the SortValues
transform will try to sort in-memory and fall back to an
ExternalSorter
if needed.
Each bucket can be further sharded to reduce the impact of hot keys, by specifying BucketMetadata#getNumShards()
.
- Source
- SortedBucketSink.java
- Alphabetic
- By Inheritance
- SortedBucketSink
- PTransform
- HasDisplayData
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new SortedBucketSink(bucketMetadata: BucketMetadata[K1, K2, V], outputDirectory: ResourceId, tempDirectory: ResourceId, filenameSuffix: String, fileOperations: FileOperations[V], sorterMemoryMb: Int, keyCacheSize: Int)
- new SortedBucketSink(bucketMetadata: BucketMetadata[K1, K2, V], outputDirectory: ResourceId, tempDirectory: ResourceId, filenameSuffix: String, fileOperations: FileOperations[V], sorterMemoryMb: Int)
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def addAnnotation(annotationType: String, annotation: Array[Byte]): PTransform[PCollection[V], WriteResult]
- Definition Classes
- PTransform
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def expand(input: PCollection[V]): WriteResult
- Definition Classes
- SortedBucketSink → PTransform
- Annotations
- @Override()
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def getAdditionalInputs(): Map[TupleTag[_ <: AnyRef], PValue]
- Definition Classes
- PTransform
- def getAnnotations(): Map[String, Array[Byte]]
- Definition Classes
- PTransform
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def getKindString(): String
- Attributes
- protected[transforms]
- Definition Classes
- PTransform
- def getName(): String
- Definition Classes
- PTransform
- def getResourceHints(): ResourceHints
- Definition Classes
- PTransform
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def populateDisplayData(builder: Builder): Unit
- Definition Classes
- PTransform → HasDisplayData
- def setDisplayData(displayData: List[ItemSpec[_ <: AnyRef]]): PTransform[PCollection[V], WriteResult]
- Definition Classes
- PTransform
- def setResourceHints(resourceHints: ResourceHints): PTransform[PCollection[V], WriteResult]
- Definition Classes
- PTransform
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- PTransform → AnyRef → Any
- Annotations
- @SideEffectFree()
- def validate(options: PipelineOptions, inputs: Map[TupleTag[_ <: AnyRef], PCollection[_ <: AnyRef]], outputs: Map[TupleTag[_ <: AnyRef], PCollection[_ <: AnyRef]]): Unit
- Definition Classes
- PTransform
- def validate(options: PipelineOptions): Unit
- Definition Classes
- PTransform
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
Deprecated Value Members
- def getDefaultOutputCoder[T <: AnyRef](input: PCollection[V], output: PCollection[T]): Coder[T]
- Definition Classes
- PTransform
- Annotations
- @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
- Deprecated
- def getDefaultOutputCoder(input: PCollection[V]): Coder[_ <: AnyRef]
- Attributes
- protected[transforms]
- Definition Classes
- PTransform
- Annotations
- @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
- Deprecated
- def getDefaultOutputCoder(): Coder[_ <: AnyRef]
- Attributes
- protected[transforms]
- Definition Classes
- PTransform
- Annotations
- @throws(classOf[org.apache.beam.sdk.coders.CannotProvideCoderException]) @Deprecated
- Deprecated