Avro

Read Avro files

Scio comes with support for reading Avro files. Avro supports generic or specific records, Scio supports both via the same method (avroFile), but depending on the type parameter.

Read Specific records

import com.spotify.scio.ScioContext
import com.spotify.scio.avro._

import org.apache.avro.specific.SpecificRecord

val sc: ScioContext = ???

// SpecificRecordClass is compiled from Avro schema files
def result = sc.avroFile[SpecificRecord]("gs://path-to-data/lake/part-*.avro")

Read Generic records

import com.spotify.scio.ScioContext
import com.spotify.scio.avro._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema

def yourAvroSchema: Schema = ???

val sc: ScioContext = ???

def result = sc.avroFile("gs://path-to-data/lake/part-*.avro", yourAvroSchema)
// `record` is of GenericRecord type

Write Avro files

Scio comes with support for writing Avro files. Avro supports generic or specific records, Scio supports both via the same method (saveAsAvroFile), but depending on the type of the content of SCollection.

Write Specific records

import com.spotify.scio.values.SCollection
import com.spotify.scio.avro._

import org.apache.avro.specific.SpecificRecord

case class Foo(x: Int, s: String)
val sc: SCollection[Foo] = ???

// convert to avro SpecificRecord
def fn(f: Foo): SpecificRecord = ???

// type of Avro specific records will hold information about schema,
// therefor Scio will figure out the schema by itself

def result =  sc.map(fn).saveAsAvroFile("gs://path-to-data/lake/output")

Write Generic records

import com.spotify.scio.values.SCollection
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema

case class Foo(x: Int, s: String)
val sc: SCollection[Foo] = ???

lazy val yourAvroSchema: Schema = ???
implicit lazy val coder: Coder[GenericRecord] = avroGenericRecordCoder(yourAvroSchema)

// convert to avro GenericRecord
def fn(f: Foo): GenericRecord = ???

// writing Avro generic records requires additional argument `schema`
def result =  sc.map(fn).saveAsAvroFile("gs://path-to-data/lake/output", schema = yourAvroSchema)

Rules for schema evolution

  • Unless impossible, provide default values for your fields.
  • New field must have a default value.
  • You can only delete field which has default value.
  • Do not change the data type of existing fields. If needed, add a new field to the schema.
  • Do not rename existing fields. If needed, use aliases.

Common issues/guidelines

  • Follow Avro guidelines, especially the one about schema evolution
  • Wherever possible use specific records
  • Use Builder pattern to construct Avro records