CSV

Scio supports reading and writing typed CSV via kantan

Kantan provides a CsvConfiguration that allows users to configure the CSV handling, Scio’s default config:

import kantan.csv._
import kantan.csv.CsvConfiguration.{Header, QuotePolicy}

CsvConfiguration(
  cellSeparator = ',',
  quote = '"',
  quotePolicy = QuotePolicy.WhenNeeded,
  header = Header.Implicit
)

Read CSV

FIXME this csvFile link is incorrectly getting two $$ Reading CSV is supported via csvFile. Note that the entire file must be read into memory since CSVs are not trivially splittable.

Read with a header

For CSV files with a header, reading requires an implicit HeaderDecoder for your type.

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._

case class A(i: Int, s: String)
implicit val decoder: HeaderDecoder[A] = HeaderDecoder.decoder("col1", "col2")(A.apply)

val sc: ScioContext = ???
val elements: SCollection[A] = sc.csvFile("gs://<input-path>/*.csv")

Read without a header

For CSV files without a header, an implicit RowDecoder must be in scope and the read must be provided with a config specifying that there is no header:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._

case class A(i: Int, s: String)

implicit val decoder: RowDecoder[A] = RowDecoder.ordered { (col1: Int, col2: String) => A(col1, col2) }
val config = CsvIO.DefaultCsvConfiguration.withoutHeader

val sc: ScioContext = ???
val elements: SCollection[A] = sc.csvFile("gs://<input-path>/*.csv", CsvIO.ReadParam(csvConfiguration = config))

Write CSV

Writing to CSV is supported via saveAsCsvFile.

Write with a header

Writing with a header requires an implicit HeaderEncoder to be in scope:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._

case class A(i: Int, s: String)

implicit val encoder: HeaderEncoder[A] = HeaderEncoder.caseEncoder("col1", "col2")(A.unapply)

val elements: SCollection[A] = ???
elements.saveAsCsvFile("gs://<output-path>/")

Write without a header

Writing without a header requires an implicit RowEncoder to be in scope:

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._

case class A(i: Int, s: String)

implicit val encoder: RowEncoder[A] = RowEncoder.encoder(0, 1)((a: A) => (a.i, a.s))

val elements: SCollection[A] = ???
elements.saveAsCsvFile("gs://<output-path>/")