Avro
Read Avro files
Scio comes with support for reading Avro files. Avro supports generic or specific records, Scio supports both via the same method (avroFile
), but depending on the type parameter.
Read Specific records
import com.spotify.scio.ScioContext
import com.spotify.scio.avro._
import org.apache.avro.specific.SpecificRecord
val sc: ScioContext = ???
// SpecificRecordClass is compiled from Avro schema files
def result = sc.avroFile[SpecificRecord]("gs://path-to-data/lake/part-*.avro")
Read Generic records
import com.spotify.scio.ScioContext
import com.spotify.scio.avro._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
def yourAvroSchema: Schema = ???
val sc: ScioContext = ???
def result = sc.avroFile("gs://path-to-data/lake/part-*.avro", yourAvroSchema)
// `record` is of GenericRecord type
Write Avro files
Scio comes with support for writing Avro files. Avro supports generic or specific records, Scio supports both via the same method (saveAsAvroFile
), but depending on the type of the content of SCollection
.
Write Specific records
import com.spotify.scio.values.SCollection
import com.spotify.scio.avro._
import org.apache.avro.specific.SpecificRecord
case class Foo(x: Int, s: String)
val sc: SCollection[Foo] = ???
// convert to avro SpecificRecord
def fn(f: Foo): SpecificRecord = ???
// type of Avro specific records will hold information about schema,
// therefor Scio will figure out the schema by itself
def result = sc.map(fn).saveAsAvroFile("gs://path-to-data/lake/output")
Write Generic records
import com.spotify.scio.values.SCollection
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
case class Foo(x: Int, s: String)
val sc: SCollection[Foo] = ???
lazy val yourAvroSchema: Schema = ???
implicit lazy val coder: Coder[GenericRecord] = avroGenericRecordCoder(yourAvroSchema)
// convert to avro GenericRecord
def fn(f: Foo): GenericRecord = ???
// writing Avro generic records requires additional argument `schema`
def result = sc.map(fn).saveAsAvroFile("gs://path-to-data/lake/output", schema = yourAvroSchema)
Rules for schema evolution
- Unless impossible, provide default values for your fields.
- New field must have a default value.
- You can only delete field which has default value.
- Do not change the data type of existing fields. If needed, add a new field to the schema.
- Do not rename existing fields. If needed, use aliases.
Common issues/guidelines
- Follow Avro guidelines, especially the one about schema evolution
- Wherever possible use specific records
- Use
Builder
pattern to construct Avro records
0.14.8-23-c45685a-20241105T161920Z*