Beam

RowType[T] provides conversion between Scala type T and a Beam Row, backed by a Beam Schema. Custom support for type T can be added with an implicit instance of RowField[T].

import java.net.URI

case class Inner(long: Long, str: String, uri: URI)
case class Outer(inner: Inner)
val record = Outer(Inner(1L, "hello", URI.create("https://www.spotify.com")))

import magnolify.beam.*
// Encode custom type URI as String
implicit val uriField: RowField[URI] = RowField.from[String](URI.create)(_.toString)

val rowType = RowType[Outer]
val row = rowType.to(record)
val copy: Outer = rowType.from(row)

// Beam Schema
val schema = rowType.schema

Enums

Enum-like types map to the Beam logical Enum type. See EnumType for more details. UnsafeEnum[T] instances are available from import magnolify.beam.unsafe.*.

Time and dates

Java and joda LocalDate types are available via import magnolify.beam.logical.date.*

For date-time, instants, and durations, use import magnolify.beam.logical.millis.*, import magnolify.beam.logical.micros.* or import magnolify.beam.logical.nanos.* as appropriate for your use-case. Note that joda types have only millisecond resolution, so excess precision will be discarded when used with micros or nanos.

Where possible, Beam logical types are used and joda types defer to these implementations:

  • Beam’s DATETIME primitive type maps to the millisecond-precision java and joda Instants and the joda DateTime.
  • The DateTime logical type is used for millisecond-precision java and joda LocalDateTime
  • The NanosInstant logical type is used for nanosecond-precision java and joda Instant
  • The Time logical type is used for nanosecond-precision java and joda LocalTime
  • The NanosDuration logical type is used for java and joda Duration

Beam’s MicrosInstant should not be used as it throws exceptions when presented with greater-than-microsecond precision data.

SQL types

SQL-compatible logical types are supported via import magnolify.beam.logical.sql.*

Case mapping

To use a different field case format in target records, add an optional CaseMapper argument to RowType:

import magnolify.beam.*
import magnolify.shared.CaseMapper
import com.google.common.base.CaseFormat

case class LowerCamel(firstName: String, lastName: String)

val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
val rowType = RowType[LowerCamel](CaseMapper(toSnakeCase))
rowType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)