Transforms

The com.spotify.scio.transforms package provides a selection of transforms with additional functionality.

WithResource

The WithResource syntax provides a convenient wrapper around DoFnWithResource that allows reuse of some resource class, for example an API client, according to the specified ResourceType behavior for variants of map, filter, flatMap, and collect:

import com.spotify.scio.values.SCollection
import com.spotify.scio.transforms._
import com.spotify.scio.transforms.DoFnWithResource.ResourceType

class Client(val name: String)
class ClientNotThreadSafe() {
  private var state: Int = 0
  def name(): String = {
    val out = s"c$state"
    state = state + 1
    out
  }
}

val elements: SCollection[String] = ???

elements.mapWithResource(new Client("c1"), ResourceType.PER_CLASS) { 
  case (client, s) => s + client.name
}
elements.filterWithResource(new Client("c2"), ResourceType.PER_INSTANCE) { 
  case (client, s) => s.nonEmpty
}
elements.collectWithResource(new Client("c3"), ResourceType.PER_INSTANCE) {
  case (client, s) if s.nonEmpty => s + client.name
}
elements.flatMapWithResource(new ClientNotThreadSafe(), ResourceType.PER_CLONE) {
  case (client, s) => s + client.name()
}

Custom Parallelism

By default, a worker on dataflow batch pipeline will have a number of threads equal to the number of vCPUs. In dataflow streaming, the default number of threads is 300.

To limit the number of concurrent items being processed a worker, CustomParallelism syntax allows setting a parallelism argument on variants of map, filter, flatMap, and collect:

import com.spotify.scio.values.SCollection
import com.spotify.scio.transforms._

val elements: SCollection[String] = ???
elements.mapWithParallelism(5) { s => s + "_append" }
elements.filterWithParallelism(5) { s => s.nonEmpty }
elements.flatMapWithParallelism(5) { s => s.split(",") }
elements.collectWithParallelism(5) { case s if s.nonEmpty => s + "_append" }

FileDownload

The FileDownload syntax provides support for downloading arbitrary URIs to a local file, then handling the results:

import com.spotify.scio.values.SCollection
import com.spotify.scio.transforms._
import scala.jdk.CollectionConverters._
import java.net.URI
import java.nio.file.Files
import java.nio.charset.StandardCharsets
  
val uris: SCollection[URI] = ???
val fileContents: SCollection[String] = uris.mapFile { path =>
  new String(Files.readAllBytes(path), StandardCharsets.UTF_8) 
}
val lines: SCollection[String] = uris.flatMapFile { path => 
  Files.readAllLines(path).asScala
}

Safe flatMap

The Safe syntax provides a safeFlatMap function that captures any exceptions thrown by the body of the transform and partitions its output into an SCollection of successfully-output elements and an SCollection of the exception-throwing input elements and the Throwable they produced.

import com.spotify.scio.values.SCollection
import com.spotify.scio.transforms._

val elements: SCollection[String] = ???
val (ok: SCollection[Int], bad: SCollection[(String, Throwable)]) = elements
  .safeFlatMap { in =>
    in.split(",").map { s => s.toInt }
  }

Pipe

The Pipe syntax provides a method to pass elements of an SCollection[String] to a specified command-line program. Additional arguments allow configuration of the working directory, application environment, and setup & teardown commands.

import com.spotify.scio.values.SCollection
import com.spotify.scio.transforms._

val elements: SCollection[String] = ???
val upperElements: SCollection[String] = elements.pipe("tr [:lower:] [:upper:]")