CSV
Scio supports reading and writing typed CSV via kantan
Kantan provides a CsvConfiguration
that allows users to configure the CSV handling, Scio’s default config:
import kantan.csv._
import kantan.csv.CsvConfiguration.{Header, QuotePolicy}
CsvConfiguration(
cellSeparator = ',',
quote = '"',
quotePolicy = QuotePolicy.WhenNeeded,
header = Header.Implicit
)
Read CSV
FIXME this csvFile link is incorrectly getting two $$ Reading CSV is supported via csvFile
. Note that the entire file must be read into memory since CSVs are not trivially splittable.
Read with a header
For CSV files with a header, reading requires an implicit HeaderDecoder
for your type.
import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._
case class A(i: Int, s: String)
implicit val decoder: HeaderDecoder[A] = HeaderDecoder.decoder("col1", "col2")(A.apply)
val sc: ScioContext = ???
val elements: SCollection[A] = sc.csvFile("gs://<input-path>/*.csv")
Read without a header
For CSV files without a header, an implicit RowDecoder
must be in scope and the read must be provided with a config specifying that there is no header:
import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._
case class A(i: Int, s: String)
implicit val decoder: RowDecoder[A] = RowDecoder.ordered { (col1: Int, col2: String) => A(col1, col2) }
val config = CsvIO.DefaultCsvConfiguration.withoutHeader
val sc: ScioContext = ???
val elements: SCollection[A] = sc.csvFile("gs://<input-path>/*.csv", CsvIO.ReadParam(csvConfiguration = config))
Write CSV
Writing to CSV is supported via saveAsCsvFile
.
Write with a header
Writing with a header requires an implicit HeaderEncoder
to be in scope:
import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._
case class A(i: Int, s: String)
implicit val encoder: HeaderEncoder[A] = HeaderEncoder.caseEncoder("col1", "col2")(A.unapply)
val elements: SCollection[A] = ???
elements.saveAsCsvFile("gs://<output-path>/")
Write without a header
Writing without a header requires an implicit RowEncoder
to be in scope:
import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.extra.csv._
import kantan.csv._
case class A(i: Int, s: String)
implicit val encoder: RowEncoder[A] = RowEncoder.encoder(0, 1)((a: A) => (a.i, a.s))
val elements: SCollection[A] = ???
elements.saveAsCsvFile("gs://<output-path>/")