ReadFiles

Scio supports reading file paths/patterns from an SCollection[String] into various formats.

Read as text lines

Reading to String text lines via readTextFiles:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection

val sc: ScioContext = ???
val paths: SCollection[String] = ???

val lines: SCollection[String] = paths.readTextFiles

Read entire file as String

Reading entire files to String via readFilesAsString:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection

val sc: ScioContext = ???
val paths: SCollection[String] = ???

val files: SCollection[String] = paths.readFilesAsString

Read entire file as binary

Reading entire files to binary Array[Byte] via readFilesAsBytes:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection

val sc: ScioContext = ???
val paths: SCollection[String] = ???

val files: SCollection[Array[Byte]] = paths.readFilesAsBytes

Read entire file as a custom type

Reading entire files to a custom type with a user-defined function from FileIO.ReadableFile to the output type via readFiles:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.{io => beam}

case class A(i: Int, s: String)
val sc: ScioContext = ???
val paths: SCollection[String] = ???

val userFn: beam.FileIO.ReadableFile => A = ???
val fileBytes: SCollection[A] = paths.readFiles(userFn)

Read with a Beam transform

Reading a file can be done with a beam PTransform from a PCollection[FileIO.ReadableFile] to PCollection[T] (as an example, beam’s TextIO.readFiles()), via another variant of readFiles

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.{io => beam}
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.PCollection

case class Record(i: Int, s: String)

val sc: ScioContext = ???
val paths: SCollection[String] = ???

val userTransform: PTransform[PCollection[beam.FileIO.ReadableFile], PCollection[Record]] = ???
val records: SCollection[Record] = paths.readFiles(userTransform)

Read with a Beam source

Reading a file can be done with a beam FileBasedSource[T] (as example, beam’s TextSource) via another variant of readFiles.

When using readFilesWithPath, the origin file path will be passed along with all elements emitted by the source.

The source will be created with the given file paths, and then split in sub-ranges depending on the desired bundle size.

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.{io => beam}

case class Record(i: Int, s: String)

val sc: ScioContext = ???
val paths: SCollection[String] = ???

val desiredBundleSizeBytes: Long = ???
val directoryTreatment: beam.FileIO.ReadMatches.DirectoryTreatment = ???
val compression: beam.Compression = ???
val createSource: String => beam.FileBasedSource[Record] = ???

val records: SCollection[Record] = paths.readFiles(
  desiredBundleSizeBytes,
  directoryTreatment,
  compression
) { file => createSource(file) }

val recordsWithPath: SCollection[(String, Record)] = paths.readFilesWithPath(
  desiredBundleSizeBytes,
  directoryTreatment,
  compression
) { file => createSource(file) }