Sort Merge Bucket
Sort Merge Bucket is a technique for writing data to file system in deterministic file locations, sorted according to some pre-determined key, so that it can later be read in as key groups with no shuffle required. Since each element is assigned a file destination (bucket) based on a hash of its join key, we can use the same technique to cogroup multiple Sources as long as they’re written using the same key and hashing scheme.
For example, given these input records, and SMB write will first extract the key, assign the record to a bucket, sort values within the bucket, and write these values to a corresponding file.
Input | Key | Bucket | File Assignment |
---|---|---|---|
{key:“b”, value: 1} | “b” | 0 | bucket-00000-of-00002.avro |
{key:“b”, value: 2} | “b” | 0 | bucket-00000-of-00002.avro |
{key:“a”, value: 3} | “a” | 1 | bucket-00001-of-00002.avro |
Two sources can be joined by opening file readers on corresponding buckets of eachT source and merging key-groups as we go.
What are SMB transforms?
scio-smb
provides three PTransform
s, as well as corresponding Scala API bindings, for SMB operations:
-
SortedBucketSink
writes data to file system in SMB format. Scala APIs (see:SortedBucketSCollection
):-
SCollection[T: Coder]#saveAsSortedBucket
source
sc.parallelize(250 until 750) .map { i => Account .newBuilder() .setId(i) .setName(i.toString) .setType(s"type${i % 5}") .setAmount(Random.nextDouble() * 1000) .build() } .saveAsSortedBucket( AvroSortedBucketIO .write[String, Account](classOf[String], "name", classOf[Account]) .to(args("accounts")) .withSorterMemoryMb(128) .withTempDirectory(sc.options.getTempLocation) .withCodec(CodecFactory.snappyCodec()) .withHashType(HashType.MURMUR3_32) .withFilenamePrefix("part") // Default is "bucket" .withNumBuckets(1) .withNumShards(1) )
Note the use of
Integer
for parameterized key type instead of a ScalaInt
. The key class must have a Coder available in the default Beam (Java) coder registry.Also note that the number of buckets specified must be a power of 2. This allows sources of different bucket sizes to still be joinable.
-
-
SortedBucketSource
reads data that has been written to file system usingSortedBucketSink
into a collection ofCoGbkResult
s. Scala APIs (see:SortedBucketScioContext
):ScioContext#sortMergeGroupByKey
(1 source)ScioContext#sortMergeJoin
(2 sources)ScioContext#sortMergeCoGroup
(1-22 sources)
Note that each
TupleTag
used to create theSortedBucketIO.Read
s needs to have a unique Id.source
sc.sortMergeJoin( classOf[String], AvroSortedBucketIO .read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema) // 1. Only 1 user per user ID // 2. Out of key intersection 250-499, only 100 (300-349, 400-499) with age < 50 .withPredicate((xs, x) => xs.size() == 0 && x.get("age").asInstanceOf[Int] < 50) .from(args("users")), AvroSortedBucketIO .read(new TupleTag[Account]("rhs"), classOf[Account]) .from(args("accounts")), TargetParallelism.max() ).map(mapFn) // Apply mapping function .saveAsTextFile(args("output"))
-
SortedBucketTransform
reads data that has been written to file system usingSortedBucketSink
, transforms eachCoGbkResult
using a user-supplied function, and immediately rewrites them using the same bucketing scheme. Scala APIs (see:SortedBucketScioContext
):-
ScioContext#sortMergeTransform
(1-22 sources)
Note that each
TupleTag
used to create theSortedBucketIO.Read
s needs to have a unique Id.source
val (readLhs, readRhs) = ( AvroSortedBucketIO .read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema) .from(args("users")), AvroSortedBucketIO .read(new TupleTag[Account]("rhs"), classOf[Account]) .from(args("accounts")) ) sc.sortMergeTransform( classOf[String], readLhs, readRhs, TargetParallelism.auto() ).to( AvroSortedBucketIO .transformOutput(classOf[String], "name", classOf[Account]) .to(args("output")) ).via { case (key, (users, accounts), outputCollector) => users.foreach { _ => outputCollector.accept( Account .newBuilder() .setId(key.toInt) .setName(key) .setType("combinedAmount") .setAmount(accounts.foldLeft(0.0)(_ + _.getAmount)) .build() ) } }
-
What kind of data can I write using SMB?
SMB writes are supported for multiple formats:
- Avro (GenericRecord and SpecificRecord) when also depending on
scio-avro
. AvroSortedBucketIO
- JSON
JsonSortedBucketIO
- Parquet when also depending on
scio-parquet
ParquetAvroSortedBucketIO
ParquetTypesSortedBucketIO
- Tensorflow when also depending on
scio-tensorflow
TensorFlowBucketIO
Null keys in SMB datasets
If the key field of one or more PCollection elements is null, those elements will be diverted into a special bucket file, bucket-null-keys.avro
. This file will be ignored in SMB reads and transforms and must be manually read by a downstream user.
Avro String keys
If you’re using AvroSortedBucketIO
, be aware of how Avro String fields are decoded. Configuration errors can result in the following runtime exception:
Cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String
[info] at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:37)
[info] at org.apache.beam.sdk.extensions.smb.BucketMetadata.encodeKeyBytes(BucketMetadata.java:222)
SpecificRecords
Scio 0.10.4 specifically has a bug in the default String decoding behavior for SpecificRecords
: by default, they’re decoded at runtime into org.apache.avro.util.Utf8
objects, rather than java.lang.String
s (the generated getter/setter signatures use CharSequence
as an umbrella type). This bug has been fixed in Scio 0.11+. If you cannot upgrade, you can mitigate this by ensuring your SpecificRecord
schema has the property java-class: java.lang.String
set in the key field. This can be done either in the avsc/avdl schema or in Java/Scala code:
val mySchema: org.apache.avro.Schema = ???
mySchema
.getField("keyField")
.schema()
.addProp(
org.apache.avro.specific.SpecificData.CLASS_PROP,
"java.lang.String".asInstanceOf[Object]
)
Note: If you’re using sbt-avro for schema generation, you can just set the SBT property avroStringType := "String"
instead.
GenericRecords
For GenericRecords, org.apache.avro.util.Utf8
decoding has always been the default. If you’re reading Avro GenericRecords in your SMB join, set the avro.java.string: String
property in the Schema of the key field.
val mySchema: org.apache.avro.Schema = ???
mySchema
.getField("keyField")
.schema()
.addProp(
org.apache.avro.generic.GenericData.STRING_PROP,
"String".asInstanceOf[Object]
)
Parquet
SMB supports Parquet reads and writes in both Avro and case class formats.
If you’re using Parquet-Avro and your schema contains a logical type, you’ll have to opt in to a logical type supplier in your Parquet Configuration
parameter:
import org.apache.avro.specific.SpecificRecordBase
import org.apache.beam.sdk.extensions.smb.{AvroLogicalTypeSupplier, ParquetAvroSortedBucketIO}
import org.apache.beam.sdk.values.TupleTag
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}
import com.spotify.scio.avro.TestRecord
// Reads
val readConf = new Configuration()
readConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
ParquetAvroSortedBucketIO
.read[TestRecord](new TupleTag[TestRecord], classOf[TestRecord])
.withConfiguration(readConf)
// Writes
val writeConf = new Configuration()
writeConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
ParquetAvroSortedBucketIO
.write(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(writeConf)
// Transforms
val transformConf = new Configuration()
transformConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
transformConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
ParquetAvroSortedBucketIO
.transformOutput(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(transformConf)
Note that if you’re using a non-default Avro version (i.e. Avro 1.11), you’ll have to supply a custom logical type supplier using Avro 1.11 classes. See Logical Types in Parquet for more information.
Tuning parameters for SMB transforms
numBuckets/numShards
SMB reads should be more performant and less resource-intensive than regular joins or groupBys. However, SMB writes are more expensive than their regular counterparts, since they involve an extra group-by (bucketing) and sorting step. Additionally, non-SMB writes (i.e. implementations of FileBasedSink
) use hints from the runner to determine an optimal number of output files. Unfortunately, SMB doesn’t have access to those runtime hints; you must specify the number of buckets and shards as static values up front.
In SMB, buckets correspond to the hashed value of the SMB key % a given power of 2. A record with a given key will always be hashed into the same bucket. On the file system, buckets consist of one or more sharded files in which records are randomly assigned per-bundle. Two records with the same key may end up in different shard files within a bucket.
- A good starting point is to look at your output data as it has been written by a non-SMB sink, and pick the closest power of 2 as your initial
numBuckets
and setnumShards
to 1. - If you anticipate having hot keys, try increasing
numShards
to randomly split data within a bucket. numBuckets
*numShards
= total # of files written to disk.
sorterMemoryMb
If your job gets stuck in the sorting phase (since the GroupByKey
and SortValues
transforms may get fused–you can reference the Counter
s SortedBucketSink-bucketsInitiatedSorting
and SortedBucketSink-bucketsCompletedSorting
to get an idea of where your job fails), you can increase sorter memory (default is 1024MB, or 128MB for Scio <= 0.9.0):
data.saveAsSortedBucket(
AvroSortedBucketIO
.write[K, V](classOf[K], "keyField", classOf[V])
.to(...)
.withSorterMemoryMb(256)
)
The amount of data each external sorter instance needs to handle is total output size / numBuckets
/ numShards
, and when this exceeds sorter memory, the sorter will spill to disk. n1-standard
workers has 3.75GB RAM per CPU, so 1GB sorter memory is a decent default, especially if the output files are kept under that size. If you have to spill to disk, note that worker disk IO depends on disk type, size, and worker number of CPUs.
See specifying pipeline execution parameters for more details, e.g. --workerMachineType
, --workerDiskType
, and --diskSizeGb
. Also read more about machine types and block storage performance
Parallelism
The SortedBucketSource
API accepts an optional TargetParallelism
parameter to set the desired parallelism of the SMB read operation. For a given set of sources, targetParallelism
can be set to any number between the least and greatest numbers of buckets among sources. This can be dynamically configured using TargetParallelism.min()
or TargetParallelism.max()
, which at graph construction time will determine the least or greatest amount of parallelism based on sources.
Alternately, TargetParallelism.of(Integer value)
can be used to statically configure a custom value, or {@link TargetParallelism#auto()}
can be used to let the runner decide how to split the SMB read at runtime based on the combined byte size of the inputs–this is also the default behavior if TargetPallelism
is left unspecified.
If no value is specified, SMB read operations will use Auto parallelism.
When selecting a target parallelism for your SMB operation, there are tradeoffs to consider:
- Minimal parallelism means a fewer number of workers merging data from potentially many buckets. For example, if source A has 4 buckets and source B has 64, a minimally parallel SMB read would have 4 workers, each one merging 1 bucket from source A and 16 buckets from source B. This read may have low throughput.
- Maximal parallelism means that each bucket is read by at least one worker. For example, if source A has 4 buckets and source B has 64, a maximally parallel SMB read would have 64 workers, each one merging 1 bucket from source B and 1 bucket from source A, replicated 16 times. This may have better throughput than the minimal example, but more expensive because every key group from the replicated sources must be re-hashed to avoid emitting duplicate records.
- A custom parallelism in the middle of these bounds may be the best balance of speed and computing cost.
- Auto parallelism is more likely to pick an ideal value for most use cases. If its performance is worse than expected, you can look up the parallelism value it has computed and try a manual adjustment. Unfortunately, since it’s determined at runtime, the computed parallelism value can’t be added to the pipeline graph through
DisplayData
. Instead, you’ll have to check the worker logs to find out which value was selected. When using Dataflow, you can do this in the UI by clicking on the SMB transform box, and searching the associated logs for the textParallelism was adjusted
. For example, in this case the value is 1024:From there, you can try increasing or decreasing the parallelism by specifying a different
TargetParallelism
parameter to your SMB read. Often auto-parallelism will select a low value and usingTargetParallelism.max()
can help.
Read buffering
Performance can suffer when reading an SMB source across many partitions if the total number of files (numBuckets
* numShards
* numPartitions
) is too large (on the order of hundreds of thousands to millions of files). We’ve observed errors and timeouts as a result of too many simultaneous filesystem connections. To that end, we’ve added two PipelineOptions
to Scio 0.10.3, settable either via command-line args or using SortedBucketOptions
directly.
--sortedBucketReadBufferSize
(default: 10000): an Integer that determines the number of elements to read and buffer from each file at a time. For example, by default, each file will have 10,000 elements read and buffered into an in-memory array at worker startup. Then, the sort-merge algorithm will request them one at a time as needed. Once 10,000 elements have been requested, the file will buffer the next 10,000.Note: this can be quite memory-intensive and require bumping the worker memory. If you have a small number of files, or don’t need this optimization, you can turn it off by setting
--sortedBucketReadBufferSize=0
.--sortedBucketReadDiskBufferMb
(default: unset): an Integer that, if set, will force each worker to actually copy the specified # of megabytes from the remote filesystem into the worker’s local temp directory, rather than streaming directly from FS. This caching is done eagerly: each worker will read as much as it can of each file in the order they’re requested, and more space will be freed up once a file is fully read. Note that this is a per worker limit.
Testing
Currently, mocking data for SMB transforms is not supported in the com.spotify.scio.testing.JobTest
framework. See SortMergeBucketExampleTest for an example of using local temp directories to test SMB reads and writes.