Sort Merge Bucket

Sort Merge Bucket is a technique for writing data to file system in deterministic file locations, sorted according by some pre-determined key, so that it can later be read in as key groups with no shuffle required. Since each element is assigned a file destination (bucket) based on a hash of its join key, we can use the same technique to cogroup multiple Sources as long as they’re written using the same key and hashing scheme.

For example, given these input records, and SMB write will first extract the key, assign the record to a bucket, sort values within the bucket, and write these values to a corresponding file.

Input Key Bucket File Assignment
{key:“b”, value: 1} “b” 0 bucket-00000-of-00002.avro
{key:“b”, value: 2} “b” 0 bucket-00000-of-00002.avro
{key:“a”, value: 3} “a” 1 bucket-00001-of-00002.avro

Two sources can be joined by opening file readers on corresponding buckets of eachT source and merging key-groups as we go.

What are SMB transforms?

scio-smb provides three PTransforms, as well as corresponding Scala API bindings, for SMB operations:

  • SortedBucketSink writes data to file system in SMB format. Scala APIs (see: SortedBucketSCollection):

    • SCollection[T: Coder]#saveAsSortedBucket

    sourcesc.parallelize(250 until 750)
      .map { i =>
        Account
          .newBuilder()
          .setId(i)
          .setName(s"user$i")
          .setType(s"type${i % 5}")
          .setAmount(Random.nextDouble() * 1000)
          .build()
      }
      .saveAsSortedBucket(
        AvroSortedBucketIO
          .write[Integer, Account](classOf[Integer], "id", classOf[Account])
          .to(args("accounts"))
          .withSorterMemoryMb(128)
          .withTempDirectory(sc.options.getTempLocation)
          .withCodec(CodecFactory.snappyCodec())
          .withHashType(HashType.MURMUR3_32)
          .withFilenamePrefix("part") // Default is "bucket"
          .withNumBuckets(1)
          .withNumShards(1)
      )

    Note the use of Integer for parameterized key type instead of a Scala Int. The key class must have a Coder available in the default Beam (Java) coder registry.

    Also note that the number of buckets specified must be a power of 2. This allows sources of different bucket sizes to still be joinable.

  • SortedBucketSource reads data that has been written to file system using SortedBucketSink into a collection of CoGbkResults. Scala APIs (see: SortedBucketScioContext):

    • ScioContext#sortMergeGroupByKey (1 source)
    • ScioContext#sortMergeJoin (2 sources)
    • ScioContext#sortMergeCoGroup (1-4 sources)
    sourcesc.sortMergeJoin(
      classOf[Integer],
      AvroSortedBucketIO
        .read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
        // 1. Only 1 user per user ID
        // 2. Out of key intersection 250-499, only 100 (300-349, 400-499) with age < 50
        .withPredicate((xs, x) => xs.size() == 0 && x.get("age").asInstanceOf[Int] < 50)
        .from(args("users")),
      AvroSortedBucketIO
        .read(new TupleTag[Account]("rhs"), classOf[Account])
        .from(args("accounts")),
      TargetParallelism.max()
    ).map(mapFn) // Apply mapping function
      .saveAsTextFile(args("output"))
  • SortedBucketTransform reads data that has been written to file system using SortedBucketSink, transforms each CoGbkResult using a user-supplied function, and immediately rewrites them using the same bucketing scheme. Scala APIs (see: SortedBucketScioContext):

    • ScioContext#sortMergeTransform (1-3 sources)

    sourceval (readLhs, readRhs) = (
      AvroSortedBucketIO
        .read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
        .from(args("users")),
      AvroSortedBucketIO
        .read(new TupleTag[Account]("rhs"), classOf[Account])
        .from(args("accounts"))
    )
    
    sc.sortMergeTransform(
      classOf[Integer],
      readLhs,
      readRhs,
      TargetParallelism.auto()
    ).to(
      AvroSortedBucketIO
        .transformOutput(classOf[Integer], "id", classOf[Account])
        .to(args("output"))
    ).via { case (key, (users, accounts), outputCollector) =>
      users.foreach { user =>
        outputCollector.accept(
          Account
            .newBuilder()
            .setId(key)
            .setName(user.get("userId").toString)
            .setType("combinedAmount")
            .setAmount(accounts.foldLeft(0.0)(_ + _.getAmount))
            .build()
        )
      }
    }

What kind of data can I write using SMB?

SMB writes are supported for Avro (GenericRecord and SpecificRecord), JSON, Parquet, and Tensorflow records. See API bindings in:

If the key field of one or more PCollection elements is null, those elements will be diverted into a special bucket file, bucket-null-keys.avro. This file will be ignored in SMB reads and transforms and must be manually read by a downstream user.

Avro String keys

If you’re using AvroSortedBucketIO, be aware of how Avro String fields are decoded. Configuration errors can result in the following runtime exception:

Cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String
[info]   at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:37)
[info]   at org.apache.beam.sdk.extensions.smb.BucketMetadata.encodeKeyBytes(BucketMetadata.java:222)

SpecificRecords

Scio 0.10.4 specifically has a bug in the default String decoding behavior for SpecificRecords: by default, they’re decoded at runtime into org.apache.avro.util.Utf8 objects, rather than java.lang.Strings (the generated getter/setter signatures use CharSequence as an umbrella type). This bug has been fixed in Scio 0.11+. If you cannot upgrade, you can mitigate this by ensuring your SpecificRecord schema has the property java-class: java.lang.String set in the key field. This can be done either in the avsc/avdl schema or in Java/Scala code:

val mySchema: org.apache.avro.Schema = ???
mySchema
  .getField("keyField")
  .schema()
  .addProp(
    org.apache.avro.specific.SpecificData.CLASS_PROP,
    "java.lang.String".asInstanceOf[Object]
  )

Note: If you’re using sbt-avro for schema generation, you can just set the SBT property avroStringType := "String" instead.

GenericRecords

For GenericRecords, org.apache.avro.util.Utf8 decoding has always been the default. If you’re reading Avro GenericRecords in your SMB join, set the avro.java.string: String property in the Schema of the key field.

val mySchema: org.apache.avro.Schema = ???
mySchema
  .getField("keyField")
  .schema()
  .addProp(
    org.apache.avro.generic.GenericData.STRING_PROP,
    "String".asInstanceOf[Object]
  )

Tuning parameters for SMB transforms

SMB reads should be more performant and less resource-intensive than regular joins or groupBys. However, SMB writes are more expensive than their regular counterparts, since they involve an extra group-by (bucketing) and sorting step.

Additionally, non-SMB writes (i.e. implementations of FileBasedSink) use hints from the runner to determine an optimal number of output files. With SMB, you must specify the number of buckets and shards (numBuckets * numShards = total # of files) up front.

  • A good starting point is to look at your output data as it has been written by a non-SMB sink, and pick the closest power of 2 as your initial numBuckets, and set numShards to 1.
  • If you anticipate having hot keys, try increasing numShards to randomly split data within a bucket.
  • If your job gets stuck in the sorting phase (since the GroupByKey and SortValues transforms may get fused–you can reference the Counters SortedBucketSink-bucketsInitiatedSorting and SortedBucketSink-bucketsCompletedSorting to get an idea of where your job fails), you can increase sorter memory (default is 1024MB, or 128MB for Scio <= 0.9.0):
data.saveAsSortedBucket(
  AvroSortedBucketIO
    .write[K, V](classOf[K], "keyField", classOf[V])
    .to(...)
    .withSorterMemoryMb(256)
)

The amount of data each external sorter instance needs to handle is total output size / numBuckets / numShards, and when this exceeds sorter memory, the sorter will spill to disk. n1-standard workers has 3.75GB RAM per CPU, so 1GB sorter memory is a decent default, especially if the output files are kept under that size. If you have to spill to disk, note that worker disk IO depends on disk type, size, and worker number of CPUs.

See specifying pipeline execution parameters for more details, e.g. --workerMachineType, --workerDiskType, and --diskSizeGb. Also read more about machine types and block storage performance

Parallelism

The SortedBucketSource API accepts an optional TargetParallelism parameter to set the desired parallelism of the SMB read operation. For a given set of sources, targetParallelism can be set to any number between the least and greatest numbers of buckets among sources. This can be dynamically configured using TargetParallelism.min() or TargetParallelism.max(), which at graph construction time will determine the least or greatest amount of parallelism based on sources. Alternately, TargetParallelism.of(Integer value) can be used to statically configure a custom value, or {@link TargetParallelism#auto()} can be used to let the runner decide how to split the SMB read at runtime based on the combined byte size of the inputs–this is also the default behavior if TargetPallelism is left unspecified.

If no value is specified, SMB read operations will use Auto parallelism.

When selecting a target parallelism for your SMB operation, there are tradeoffs to consider:

  • Minimal parallelism means a fewer number of workers merging data from potentially many buckets. For example, if source A has 4 buckets and source B has 64, a minimally parallel SMB read would have 4 workers, each one merging 1 bucket from source A and 16 buckets from source B. This read may have low throughput.
  • Maximal parallelism means that each bucket is read by at least one worker. For example, if source A has 4 buckets and source B has 64, a maximally parallel SMB read would have 64 workers, each one merging 1 bucket from source B and 1 bucket from source A, replicated 16 times. This may have better throughput than the minimal example, but more expensive because every key group from the replicated sources must be re-hashed to avoid emitting duplicate records.
  • A custom parallelism in the middle of these bounds may be the best balance of speed and computing cost.
  • Auto parallelism is more likely to pick an ideal value for most use cases. If its performance is worse than expected, you can look up the parallelism value it has computed and try a manual adjustment. Unfortunately, since it’s determined at runtime, the computed parallelism value can’t be added to the pipeline graph through DisplayData. Instead, you’ll have to check the worker logs to find out which value was selected. When using Dataflow, you can do this in the UI by clicking on the SMB transform box, and searching the associated logs for the text Parallelism was adjusted. For example, in this case the value is 1024:
    Finding computed parallelism

    From there, you can try increasing or decreasing the parallelism by specifying a different TargetParallelism parameter to your SMB read.

Testing

Currently, mocking data for SMB transforms is not supported in the com.spotify.scio.testing.JobTest framework. See SortMergeBucketExampleTest for an example of using local temp directories to test SMB reads and writes.