Usage:

 

sbt "runMain com.spotify.scio.examples.extra.AvroExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=[INPUT].avro --output=[OUTPUT].avro --method=[METHOD]"

package com.spotify.scio.examples.extra import com.spotify.scio._ import com.spotify.scio.avro._ import com.spotify.scio.avro.types.AvroType import com.spotify.scio.io.ClosedTap import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} object AvroExample { @AvroType.fromSchema("""{ | "type":"record", | "name":"Account", | "namespace":"com.spotify.scio.avro", | "doc":"Record for an account", | "fields":[ | {"name":"id","type":"int"}, | {"name":"type","type":"string"}, | {"name":"name","type":"string"}, | {"name":"amount","type":"double"}]} """.stripMargin) class AccountFromSchema @AvroType.toSchema case class AccountToSchema(id: Int, `type`: String, name: String, amount: Double) def pipeline(cmdlineArgs: Array[String]): ScioContext = { val (sc, args) = ContextAndArgs(cmdlineArgs) val m = args("method") m match

write dummy specific records

case "specificOut" => specificOut(sc, args)  

read dummy specific records

case "specificIn" => specificIn(sc, args)  

write dummy generic records

case "genericOut" => genericOut(sc, args)  

read dummy generic records

case "genericIn" => genericIn(sc, args)  

write typed generic records

case "typedOut" => typedOut(sc, args)  

read typed generic records

case "typedIn" => typedIn(sc, args) case _ => throw new RuntimeException(s"Invalid method $m") } sc } def main(cmdlineArgs: Array[String]): Unit = { val sc = pipeline(cmdlineArgs) sc.run().waitUntilDone() () } private def specificOut(sc: ScioContext, args: Args): ClosedTap[Account] = sc.parallelize(1 to 100) .map { i => Account .newBuilder() .setId(i) .setAmount(i.toDouble) .setName("account" + i) .setType("checking") .build() } .saveAsAvroFile(args("output")) private def specificIn(sc: ScioContext, args: Args): ClosedTap[String] = sc.avroFile[Account](args("input")) .map(_.toString) .saveAsTextFile(args("output")) private def genericOut(sc: ScioContext, args: Args): ClosedTap[GenericRecord] = { 

Avro generic record encoding is more efficient with an explicit schema

implicit def genericCoder = avroGenericRecordCoder(schema) sc.parallelize(1 to 100) .map[GenericRecord] { i => new GenericRecordBuilder(schema) .set("id", i) .set("amount", i.toDouble) .set("name", "account" + i) .set("type", "checking") .build() } .saveAsAvroFile(args("output"), schema = schema) } private def typedIn(sc: ScioContext, args: Args): ClosedTap[String] = sc.typedAvroFile[AccountFromSchema](args("input")) .saveAsTextFile(args("output")) private def typedOut(sc: ScioContext, args: Args): ClosedTap[AccountToSchema] = sc.parallelize(1 to 100) .map { i => AccountToSchema(id = i, amount = i.toDouble, name = "account" + i, `type` = "checking") } .saveAsTypedAvroFile(args("output")) private def genericIn(sc: ScioContext, args: Args): ClosedTap[String] = sc.avroFile(args("input"), schema) .map(_.toString) .saveAsTextFile(args("output")) val schema: Schema = SchemaBuilder .record("GenericAccountRecord") .fields() .requiredInt("id") .requiredDouble("amount") .requiredString("name") .requiredString("type") .endRecord() }