Coder Typeclass
Coder
in Apache Beam
As per Beam’s documentation
When Beam runners execute your pipeline, they often need to materialize the intermediate data in your PCollections, which requires converting elements to and from byte strings. The Beam SDKs use objects called Coders to describe how the elements of a given PCollection may be encoded and decoded.
For the most part, coders are used when Beam transfer intermediate data between workers over the network. They may also be used by beam to test instances for equality. Anytime you create an SCollection[T]
, Beam needs to know how to go from an instance of T
to an array of bytes, and from that array of bytes to an instance of T
.
The Beam SDK defines a class called Coder
that roughly looks like this:
public abstract class Coder<T> implements Serializable {
public abstract void encode(T value, OutputStream outStream);
public abstract T decode(InputStream inStream);
}
Beam provides built-in Coders for various basic Java types (Integer
, Long
, Double
, etc.). But anytime you create a new class, and that class is used in an SCollection
, a beam coder needs to be provided.
import com.spotify.scio.values.SCollection
case class Foo(x: Int, s: String)
def coll: SCollection[Foo] = ??? // Beam will need an org.apache.beam.sdk.coders.Coder[Foo]
When are Coder
used ?
Shuffling data
Whenever intermediate data is shuffled, Beam will need to serialize and deserialize that data to transfer it between workers. In Scio any *byKey
(groupByKey
, reduceByKey
, etc.) transform will trigger a shuffle.
Cluster scaling up and down
When the runner scales up and down your cluster size, data needs to be redistributed between workers. Beam therefore needs to transfer data over the network, which means serializing and deserializing it by using Coder
.
GroupByKey
Grouping by key uses Coder
for two reasons: First, as we have already seen, GBK triggers a shuffle, and therefore go through a serialization / deserialization cycle.
Second, grouping elements by key means that Beam needs to compare them and be able to decide whether two instances are equal. In Beam, and in the context of a groupByKey
(or any *byKey
operation), the equality of keys is tested by comparing their serialized form.
Let’s say we have defined a class Identifier
and we use it as the key in a groupByKey
transform:
import com.spotify.scio.values.SCollection
val coll: SCollection[(Identifier, Foo)] = ???
val grouped: SCollection[(Identifier, Iterable[Foo])] = coll.groupByKey()
To decide whether two instances of Identifier
id1
and id2
are equal, Beam will compare them after serialization. For example:
// (pseudo-code)
coder.encode(id1) // 00010011
coder.encode(id2) // 00010010
// -> i1 and i2 are NOT equal
When they are used to test equality, coders are required to be deterministic. If a non-deterministic Coder
is used to test equality, an exception is thrown:
import com.spotify.scio.ScioContext
val sc = ScioContext.forTest()
val grouped =
sc.parallelize(List((1.2, "foo"), (42.5, "bar"), (1.2, "baz")))
.groupByKey
sc.run()
Scio Coder
vs Beam Coder
Both Scio and Beam define a class called Coder
. For the most part when writing a job, you will be interacting with com.spotify.scio.coders.Coder
.
Scio Coder
and its implementations simply form an ADT where each implementation is a building block that covers one of the possible cases:
Beam
: a simple wrapper around a Beam CoderSingleton
: A coder for a static object. It is for example used to serializeUnit
.Disjunction
: Represent a Coder that makes a choice between different possible implementations. It is for example used to serialize ADTs andEither
Record
: A Coder for record-like structures like case classes and tuples.Transform
: A Coder implemented by “transforming” the encoded/decoded value of another Coder.CoderTransform
: A Coder implemented by “transforming” a Beam Coder to a new Coder.Fallback
: A defaultCoder
. Used when there is no better option.
There is also a “special” coder called KVCoder
. It is a specific coder for Key-Value pairs. Internally Beam treats KV
differently from other types so Scio needs to do the same.
It is important to note that Scio’s coders are only representations of those cases but do not actually implement any serialization logic. Before the job starts, those coders will be materialized, meaning they will be converted to instances of org.apache.beam.sdk.coders.Coder
. Thanks to this technique, Scio can dynamically change the behavior of coders depending on the execution context. For example coders may handle nullable values differently depending on options passed to the job.
org.apache.beam.sdk.coders.Coder
instances on the other hand are the actual implementations of serialization and deserialization logic. Among other thing, each instance of org.apache.beam.sdk.coders.Coder[T]
defines two methods:
class ExampleCoder extends org.apache.beam.sdk.coders.Coder[Example] {
def decode(inStream: InputStream): Example = ???
def encode(value: Example, outStream: OutputStream): Unit = ???
}
How Scio picks a Coder instance
Every method in Scio that may potentially need to serialize and deserialize data takes an implicit Coder
argument. See for example the definition of SCollection.map
:
def map[U: Coder](f: T => U): SCollection[U] = // implementation
// ↑
// Implicit Coder[U] lookup
This type signature means the following: The method map
(defined in SCollection[T]
), applied to a function from T
to U
, will return an SCollection[U]
. On top of that this method has a context bound U: Coder
, meaning a Coder[U]
needs to be available in the implicit context.
So at compile time the Scala compiler will try to find an appropriate Coder
. If it fails to find one, the compilation will fail.
When the compiler looks for an implicit for a concrete type Foo
, three cases can happen:
There exist an Coder[Foo]
in scope
Scio comes with a number of implementation for common types like primitives (Int
, Float
, String
, etc.), common Scala and Java types (Option
, Either
, List
, etc.) and some types from the Beam API. In that case the available Coder
will simply be used. It is also possible for the user (aka you) to provide an implementation. Here’s an example REPL session that demonstrate it:
import com.spotify.scio.coders._
Coder[Int] // Try to find a Coder instance for Int
Here the compiler just found a proper Coder for integers. Scio also provides Coders for commons collections types:
Coder[List[String]] // Try to find a Coder instance for List[String]
No Coder[Foo]
is available but the compiler can derive one
For certain type (for example case classes with a public constructor), Scio can derive an inline Coder
implementation at compile time. Note that it does not generate source code.
case class Demo(i: Int, s: String, xs: List[Double])
Coder[Demo]
sealed class hierarchy are also supported:
sealed trait Top
final case class TA(anInt: Int, aString: String) extends Top
final case class TB(anDouble: Double) extends Top
Coder[Top]
No Coder[Foo]
is available and the compiler can not derive one
Sometimes, no Coder
instance can be found, and it’s impossible to automatically derive one. In that case, Scio can fallback to a Kryo
coder for that type by importing com.spotify.scio.coders.kryo._
. Note that it might negatively impact the performance of your job.
If the scalac flag -Xmacro-settings:show-coder-fallback=true
is set, a warning message will be displayed at compile time. This message should help you keep track where the implicit kryo coder are used.
While compiling the following example with -Xmacro-settings:show-coder-fallback=true
import com.spotify.scio.coders._
import com.spotify.scio.coders.kryo._
val localeCoder = Coder[java.util.Locale]
Scalac will output:
Warning: No implicit Coder found for the following type:
>> java.util.Locale
using Kryo fallback instead.
Scio will use a fallback Kryo coder instead.
If a type is not supported, consider implementing your own implicit Coder for this type.
It is recommended to declare this Coder in your class companion object:
object Locale {
import com.spotify.scio.coders.Coder
import org.apache.beam.sdk.coders.AtomicCoder
implicit def coderLocale: Coder[Locale] =
Coder.beam(new AtomicCoder[Locale] {
def decode(in: InputStream): Locale = ???
def encode(ts: Locale, out: OutputStream): Unit = ???
})
}
If you do want to use a Kryo coder, be explicit about it:
implicit def coderLocale: Coder[Locale] = Coder.kryo[Locale]
Additional info at:
- https://spotify.github.io/scio/internals/Coders
In this example, the compiler could not find a proper instance of Coder[Locale]
, and suggest you implement one yourself.
Note that this message is not limited to direct invocation of fallback. For example, if you declare a case class that uses Locale
internally, the compiler will show the same warning:
import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.kryo._
case class Demo2(i: Int, s: String, xs: List[java.util.Locale])
val demoCoder = Coder[Demo2]
Int
, String
and List
all have predefined Coder
instances but Locale
does not. The serialization of Locale
instances is delegated to Kryo.
Compiler flags and warnings
When Scio automatically derives a Coder for a given type, it may issue warnings about potential performance issues. For example, the default implementation of Coder[GenericRecord]
is very inefficient and Scio will issue a message if it is used:
[info] Using a fallback coder for Avro's GenericRecord is discouraged as it is VERY inefficient.
[info] It is highly recommended to define a proper Coder[GenericRecord] using:
[info]
[info] Coder.avroGenericRecordCoder(schema)
It is also possible to pass a flag to the compiler to issue a message anytime the fallback coder is used:
[info] Warning: No implicit Coder found for the following type:
[info]
[info] >> com.google.common.collect.SetMultimap[String,String]
[info]
[info] using Kryo fallback instead.
To activate this feature, pass -Xmacro-settings:show-coder-fallback=true
to scalac
in your build file:
scalacOptions += "-Xmacro-settings:show-coder-fallback=true"
How to build a custom Coder
It is possible for the user to define their own Coder
implementation. Scio provides builder functions
in the Coder
object. If you want to create a custom Coder
, you should use one of the those three builder:
-
Coder.beam
: Create a ScioCoder
that simply wraps a Beam implementation. For example:import com.spotify.scio.coders._ import org.apache.beam.sdk.coders.DoubleCoder implicit def doubleCoder = Coder.beam(DoubleCoder.of())
-
Coder.transform
: Create a Coder for a typeB
by transforming the Beam implementation for a typeA
. Usually useful forCoder
that depend on anotherCoder
:import java.io.{InputStream, OutputStream} import org.apache.beam.sdk.coders.AtomicCoder class ListCoder[T](bc: org.apache.beam.sdk.coders.Coder[T]) extends AtomicCoder[List[T]] { override def encode(value: List[T], outStream: OutputStream): Unit = ??? override def decode(inStream: InputStream): List[T] = ??? } implicit def listCoder[T: Coder]: Coder[List[T]] = Coder.transform(Coder[T])(bc => Coder.beam(new ListCoder[T](bc)))
-
Coder.xmap
: Create a Coder for a typeB
by reusing aCoder[A]
.xmap
simply apply the provided function to convertB
toA
and back. See for example a possibleCoder[Char]
based on an existingCoder[Byte]
implicit def charCoder: Coder[Char] = Coder.xmap(Coder[Byte])(_.toChar, _.toByte)
⚠ Serialization ⚠
Coder instances have to be Serializable
. You do not need to extend Serializable
explicitly since the Coder
trait already does, but you do need to make sure that your implementation is not referencing a non-serializable object in any way.
Note that in test mode (when you use JobTest
), Scio will make sure that all the coders used in the job are serializable.
Testing custom coders
Scio provides a few assertions specific to coders. See CoderAssertions
.
Null values support
By default, and for performance reasons, Scio coders will expect the values to serialized to never be null
.
This may cause the following exception to be thrown:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException:
Exception while trying to `encode` an instance of scala.Tuple3:
Can't encode field _3 value null
There are 2 ways to fix this issue:
- Recommended: Replace
null
values byOption
in you job code. - NOT recommended: pass the following flag when you start the job:
--nullableCoders=true
If you pass this option, Scio will assume that every value are potentially
null
. This include every single fields in your case classes and each every elements in collections. It introduces overhead and may slow down your job execution.
Upgrading to v0.7.0
or above: Migrating to static coder
Migrating to Scio 0.7.x
from an older version is likely to break a few things at compile time in your project. See the complete v0.7.0 Migration Guide for more information.