Parquet
Scio supports reading and writing Parquet files as Avro records or Scala case classes. Also see Avro page on reading and writing regular Avro files.
Avro
Read Parquet files as Avro
When reading Parquet as Avro specific records, one can use parquet-extra macros for generating column projections and row predicates using idiomatic Scala syntax. To read a Parquet file as Avro specific record with column projections and row predicates:
import com.spotify.scio._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord
object ParquetJob {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
// Macros for generating column projections and row predicates
val projection = Projection[TestRecord](_.getIntField, _.getLongField, _.getBooleanField)
val predicate = Predicate[TestRecord](x => x.getIntField > 0 && x.getBooleanField)
sc.parquetAvroFile[TestRecord]("input.parquet", projection, predicate)
// Map out projected fields right after reading
.map(r => (r.getIntField, r.getStringField, r.getBooleanField))
sc.run()
()
}
}
Note that the result TestRecord
s are not complete Avro objects. Only the projected columns (intField
, stringField
, booleanField
) are present while the rest are null. These objects may fail serialization and it’s recommended that you map them out to tuples or case classes right after reading.
Also note that predicate
logic is only applied when reading actual Parquet files but not in JobTest
. To retain the filter behavior while using mock input, it’s recommend that you do the following.
import com.spotify.scio._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord
object ParquetJob {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val projection = Projection[TestRecord](_.getIntField, _.getLongField, _.getBooleanField)
// Build both native filter function and Parquet FilterPredicate
// case class Predicates[T](native: T => Boolean, parquet: FilterPredicate)
val predicate = Predicate.build[TestRecord](x => x.getIntField > 0 && x.getBooleanField)
sc.parquetAvroFile[TestRecord]("input.parquet", projection, predicate.parquet)
// filter natively with the same logic in case of mock input in `JobTest`
.filter(predicate.native)
sc.run()
()
}
}
You can also read Avro generic records by specifying a reader schema.
import com.spotify.scio._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord
import org.apache.avro.generic.GenericRecord
object ParquetJob {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.parquetAvroFile[GenericRecord]("input.parquet", TestRecord.getClassSchema)
// Map out projected fields into something type safe
.map(r => (r.get("int_field").asInstanceOf[Int], r.get("string_field").toString))
sc.run()
()
}
}
Write Avro to Parquet files
Both Avro generic and specific records are supported when writing.
Type of Avro specific records will hold information about schema, therefore Scio will figure out the schema by itself:
import com.spotify.scio.values._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord
def input: SCollection[TestRecord] = ???
def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output")
Writing Avro generic records requires additional argument schema
:
import com.spotify.scio.values._
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._
import com.spotify.scio.parquet.avro._
import org.apache.avro.generic.GenericRecord
def input: SCollection[GenericRecord] = ???
lazy val yourAvroSchema: org.apache.avro.Schema = ???
implicit lazy val coder: Coder[GenericRecord] = avroGenericRecordCoder(yourAvroSchema)
def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output", schema = yourAvroSchema)
Logical Types
As of Scio 0.14.0 and above, Scio supports specific record logical types in parquet-avro out of the box.
When using generic record you’ll need to supply the additional Configuration parameter AvroReadSupport.AVRO_DATA_SUPPLIER
for reads or AvroWriteSupport.AVRO_DATA_SUPPLIER
for writes to use logical types.
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.parquet.avro._
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.values.SCollection
import org.apache.avro.Conversions
import org.apache.avro.generic.GenericRecord
import org.apache.avro.data.TimeConversions
import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}
val sc: ScioContext = ???
implicit val coder: Coder[GenericRecord] = ???
val data: SCollection[GenericRecord] = ???
class AvroLogicalTypeSupplier extends AvroDataSupplier {
override def get(): GenericData = {
val data = GenericData.get()
// Add conversions as needed
data.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion())
data
}
}
// Reads
sc.parquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier])
)
// Writes
data.saveAsParquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier])
)
Case classes
Scio uses magnolify-parquet to derive Parquet reader and writer for case classes at compile time, similar to how coders work. See this mapping table for how Scala and Parquet types map; enum type mapping is also specifically documented.
Read Parquet files as case classes
When reading Parquet files as case classes, all fields in the case class definition are read. Therefore, it’s desirable to construct a case class type with only fields needed for processing.
Starting in Magnolify 0.4.8 (corresponding to Scio 0.11.6 and above), predicates for case classes have Magnolify support at the field level only. You can use Parquet’s FilterApi.or
and FilterApi.and
to chain them:
import com.spotify.scio._
import com.spotify.scio.parquet.types._
import magnolify.parquet._
import org.apache.parquet.filter2.predicate.FilterApi
object ParquetJob {
case class MyRecord(int_field: Int, string_field: String)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.typedParquetFile[MyRecord]("input.parquet", predicate = FilterApi.and(
Predicate.onField[String]("string_field")(_.startsWith("a")),
Predicate.onField[Int]("int_field")(_ % 2 == 0))
)
sc.run()
()
}
}
Write case classes to Parquet files
When writing case classes as Parquet files, the schema is derived from the case class and all fields are written.
import com.spotify.scio.values._
import com.spotify.scio.parquet.types._
case class MyRecord(int_field: Int, string_field: String)
def input: SCollection[MyRecord] = ???
def result = input.saveAsTypedParquetFile("gs://path-to-data/lake/output")
Compatibility
Note that Parquet writes Avro array
fields differently than most other Parquet submodules. For example my_field: List[Int]
would normally map to something like this:
repeated int32 my_field;
While parquet-avro
would map it to this:
required group my_field {
repeated int32 array;
}
Add the following import to handle typed Parquet in a way compatible with Parquet Avro:
import magnolify.parquet.ParquetArray.AvroCompat._
The same Avro schema evolution principles apply to Parquet, i.e. only append OPTIONAL
or REPEATED
fields with default null
or []
. See this test for some common scenarios w.r.t. Parquet schema evolution.
Configuring Parquet
The Parquet Java library heavily relies on Hadoop’s Job
API. Therefore, in both the Parquet library and in scio-parquet, we use Hadoop’s Configuration class to manage most Parquet read and write options.
The Configuration
class, when initialized, will load default values from the first available core-site.xml
found on the classpath. Scio-parquet provides a default core-site.xml
implementation: if your Scio pipeline has a dependency on scio-parquet
, these default options will be picked up in your pipeline.
Overriding the Default Configuration
You can override the default configuration in two ways:
-
Declare a
core-site.xml
file of your own in your project’ssrc/main/resources
folder. Note that Hadoop can only pick onecore-site.xml
to read: if you override the file in your project, Hadoop will not read Scio’s defaultcore-site.xml
at all, and none of its default options will be loaded. -
Create an in-memory
Configuration
object for use with scio-parquet’sReadParam
andWriteParam
. Any options provided this way will be appended to Scio’s default configuration.
You can create and pass in a custom Configuration using our ParquetConfiguration
helper, available in Scio 0.12.x and above:
import com.spotify.scio.parquet.ParquetConfiguration
data
.saveAsParquetAvroFile(args("output"), conf = ParquetConfiguration.of("parquet.block.size" -> 536870912))
If you’re on Scio 0.11.x or below, you’ll have to create a Configuration
object directly:
import org.apache.hadoop.conf.Configuration
val parquetConf: Configuration = {
val conf: Configuration = new Configuration()
conf.setInt("parquet.block.size", 536870912)
conf
}
Common Configuration Options
parquet.block.size
- This determines block size for HDFS and row group size. 1 GiB is recommended over the default 128 MiB, although you’ll have to weigh the tradeoffs: a larger block size means fewer seek operations on blob storage, at the cost of having to load a larger row group into memory.fs.gs.inputstream.fadvise
- “Fadvise allows applications to provide a hint to the Linux kernel with the intended I/O access pattern, indicating how it intends to read a file, whether for sequential scans or random seeks.” According to this blog post “traditional MapReduce jobs” are ideal use cases for setting this config toSEQUENTIAL
, and Scio jobs fit in this category.
Here are some other recommended settings.
numShards
- This should be explicitly set so that the size of each output file is smaller than but close toparquet.block.size
, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks.compression
- Parquet defaults to ZSTD compression with a level of 3; compression level can be set to any integer from 1-22 using the configuration optionparquet.compression.codec.zstd.level
.SNAPPY
andGZIP
compression types also work out of the box; Snappy is less CPU intensive but has lower compression ratio. In our benchmarks GZIP seem to work better than Snappy on GCS.
A full list of Parquet configuration options can be found here.
Parquet Reads in Scio 0.12.0+
Parquet read internals have been reworked in Scio 0.12.0. As of 0.12.0, you can opt-into the new Parquet read implementation, backed by the new Beam SplittableDoFn API, by following the instructions here.
Testing
In addition to JobTest support for Avro, Typed, and Tensorflow models, Scio 0.14.5 and above include utilities for testing projections and predicates. Just import the desired module, com.spotify.scio.testing.parquet.{avro|types|tensorflow}._
, from the scio-test-parquet
artifact. For example, test utilities for Avro are available in com.spotify.scio.testing.parquet.avro._
:
import com.spotify.scio.testing.parquet.avro._
val projection: Schema = ???
val predicate: FilterPredicate = ???
val records: Iterable[T <: SpecificRecord] = ???
val expected: Iterable[T <: SpecificRecord] = ???
records withProjection projection withPredicate predicate should contain theSameElementsAs expected
You can also test a case class projection against an Avro writer schema to ensure writer/reader compatibility:
import com.spotify.scio.testing.parquet.avro._
case class MyProjection(id: Int)
val records: Iterable[T <: SpecificRecord] = ???
val expected: Iterable[MyRecord] = ???
records withProjection[MyProjection] should contain theSameElementsAs expected