REPL

The Scio REPL is an extension of the Scala REPL, with added functionality that allows you to interactively experiment with Scio. Think of it as a playground to try out things.

Quick start

You can either install Scio REPL via our Homebrew tap on a Mac or download the pre-built jar on other platforms.

Homebrew

brew tap spotify/public
brew install scio
scio-repl

Pre-built jar

To download pre-built jar of Scio REPL, find version you are interested in on the release page, and download the REPL jar from Downloads section.

$ java -jar scio-repl-<version>.jar
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'

scio>

A ScioContext is created on REPL startup as sc and a starting point to most operations. Use tab completion, history and other REPL goodies to play around.

Start from SBT console

$ git clone git@github.com:spotify/scio.git
Cloning into 'scio'...
remote: Counting objects: 9336, done.
remote: Compressing objects: 100% (275/275), done.
remote: Total 9336 (delta 139), reused 0 (delta 0), pack-reused 8830
Receiving objects: 100% (9336/9336), 1.76 MiB | 0 bytes/s, done.
Resolving deltas: 100% (3509/3509), done.
Checking connectivity... done.
$ cd scio
$ sbt scio-repl/run

Build REPL jar manually

You can also build REPL jar from source.

$ git clone git@github.com:spotify/scio.git
Cloning into 'scio'...
remote: Counting objects: 9336, done.
remote: Compressing objects: 100% (275/275), done.
remote: Total 9336 (delta 139), reused 0 (delta 0), pack-reused 8830
Receiving objects: 100% (9336/9336), 1.76 MiB | 0 bytes/s, done.
Resolving deltas: 100% (3509/3509), done.
Checking connectivity... done.
$ cd scio
$ sbt scio-repl/assembly

sbt project from scio-template

Projects generated from scio-template.g8 have built-in REPL. Run sbt repl/run from the project root.

Tutorial

Local pipeline

Let’s start with simple local-mode word count example:

// The REPL loads the following for you.
import com.spotify.scio._

def sc: ScioContext = ???
val wordCount = sc
    .textFile("README.md")
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .countByValue
    .map(_.toString)
    .saveAsTextFile("/tmp/local_wordcount")

val scioResult = sc.run().waitUntilDone()

val values = scioResult.tap(wordCount).value.take(3)

Make sure README.md is in the current directory. This example counts words in local file using a local runner (DirectRunner and writes result in a local file. The pipeline and actual computation starts on sc.run(). The last command take 3 lines from results and prints them.

Local pipeline ++

In the next example we will spice things up a bit and read data from GCS:

:newScio
import com.spotify.scio._

def sc: ScioContext = ???
val shakespeare = sc.textFile("gs://dataflow-samples/shakespeare/hamlet.txt")

val wordCount = shakespeare
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .countByValue
    .map(_.toString)
    .saveAsTextFile("/tmp/gcs-wordcount")

val result = sc
    .run()
    .waitUntilDone()
    .tap(wordCount)
    .value
    .take(3)

Each Scio context is associated with one and only one pipeline. The previous instance of sc was used for the local pipeline example and cannot be reused anymore. The first magic command, :newScio creates a new context as sc. The pipeline still performs computation locally, but reads data from Google Cloud Storage (it could also be BigQuery, Datastore, etc). This example may take a bit longer due to additional network overhead.

Dataflow service pipeline

To create a Scio context for Google Cloud Dataflow service, add Dataflow pipeline options when starting the REPL. The same options will also be used by :newScio when creating new context. For example:

$ java -jar scio-repl-0.7.0.jar \
> --project=<project-id> \
> --stagingLocation=<staging-dir> \
> --tempLocation=<temp-dir> \
> --runner=DataflowRunner
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'
import com.spotify.scio._

def sc: ScioContext = ???
val shakespeare = sc.textFile("gs://dataflow-samples/shakespeare/*")

val wordCount = shakespeare
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .countByValue
    .map(_.toString)
    .saveAsTextFile("gs://<gcs-output-dir>")

val result = sc
    .run()
    .waitUntilDone()
    .tap(wordCount)
    .value
    .take(3)

In this case we are reading data from GCS and performing computation in GCE virtual machines managed by Dataflow service. The last line is an example of reading data from GCS files to local memory after a context is closed. Most write operations in Scio return Future[Tap[T]] where a Tap[T] encapsulates some dataset that can be re-opened in another context or directly.

Use :scioOpts to view or update Dataflow options inside the REPL. New options will be applied the next time you create a context.

Ad-hoc local mode

You may start the REPL in distributed mode and run pipelines to aggregate from large datasets, and play around the results in local mode. You can create a local Scio context any time with :newLocalScio <name> and use it for local computations.

scio> :newLocalScio lsc
Local Scio context available as 'lsc'

BigQuery example

In this example we will read some data from BigQuery and process it in Dataflow. We shall count number of tornadoes per month from a public sample dataset. Scio will do its best to find your configured Google Cloud project, but you can also specify it explicitly via -Dbigquery.project option.

$ java -jar -Dbigquery.project=<project-id> scio-repl-0.7.0.jar \
> --project=<project-id> \
> --stagingLocation=<staging-dir> \
> --tempLocation=<temp-dir> \
> --runner=DataflowRunner
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'
import com.spotify.scio._
import com.spotify.scio.bigquery._

def sc: ScioContext = ???
val tornadoes = sc.bigQuerySelect(Query("SELECT tornado, month FROM [apache-beam-testing:samples.weather_stations]"))
 
val counts = tornadoes
    .flatMap(r => if (r.getBoolean("tornado")) Seq(r.getLong("month")) else Nil)
    .countByValue
    .map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2))
    .take(3)
    .materialize

val result = sc
    .run()
    .waitUntilDone()
    .tap(counts)
    .value

In this example we combine power of BigQuery and flexibility of Dataflow. We first query BigQuery table, perform a couple of transformations and take (take(3)) some data back locally (materialize) to view the results.

BigQuery project id

Scio REPL will do its best to find your configured Google Cloud project, without the need to explicitly specifying bigquery.project property. It will search for project-id in this specific order:

  1. bigquery.project java system property
  2. GCLOUD_PROJECT java system property
  3. GCLOUD_PROJECT environmental variable
  4. gcloud config files:
  5. scio named configuration
  6. default configuration

This means that you can always set bigquery.project and it will take precedence over other configurations. Read more about gcloud configuration here.

I/O Commands

There are few built-in commands for simple file I/O.

import scala.reflect._
import kantan.csv._

// Read from an Avro, text, CSV or TSV file on local filesystem or GCS.
def readAvro[T : ClassTag](path: String): Iterator[T] = ???
def readText(path: String): Iterator[String] = ???
def readCsv[T: RowDecoder](path: String,
                           sep: Char = ',',
                           header: Boolean = false): Iterator[T] = ???
def readTsv[T: RowDecoder](path: String,
                           sep: Char = '\t',
                           header: Boolean = false): Iterator[T] = ???

// Write to an Avro, text, CSV or TSV file on local filesystem or GCS.
def writeAvro[T: ClassTag](path: String, data: Seq[T]): Unit = ???
def writeText(path: String, data: Seq[String]): Unit = ???
def writeCsv[T: RowEncoder](path: String, data: Seq[T],
                            sep: Char = ',',
                            header: Seq[String] = Seq.empty): Unit = ???
def writeTsv[T: RowEncoder](path: String, data: Seq[T],
                            sep: Char = '\t',
                            header: Seq[String] = Seq.empty): Unit = ???

Tips

Multi-line code

While in the REPL, use :paste magic command to paste or write multi-line code

:paste
import com.spotify.scio._

def sc: ScioContext = ???
// Entering paste mode (ctrl-D to finish)
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection

def evenNumber(x: Int): Boolean = x % 2 == 0
def evenNumbers: SCollection[Int] = sc.parallelize(1 to 100).filter(evenNumber)

// Exiting paste mode, now interpreting.

def tap: ClosedTap[String] = evenNumbers.saveAsTextFile("/tmp/even")

def result = sc.run()

Running jobs asynchronously

When using REPL and Dataflow service consider using the non-blocking DataflowRunner for a more interactive experience. To start:

java -jar scio-repl-0.7.0.jar \
> --project=<project-id> \
> --stagingLocation=<staging-dir> \
> --tempLocation=<temp-dir> \
> --runner=DataflowRunner
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'
import com.spotify.scio._

def sc: ScioContext = ???
import com.spotify.scio.io.ClosedTap

def closedTap: ClosedTap[String] = sc
    .parallelize(1 to 100)
    .map( _.toString )
    .saveAsTextFile("gs://<output>")

def result = sc.run()
// [main] INFO org.apache.beam.runners.dataflow.DataflowRunner - Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
// [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil - Uploading 3 files from PipelineOptions.filesToStage to staging location to prepare for execution.
// [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil - Uploading PipelineOptions.filesToStage complete: 2 files newly uploaded, 1 files cached
// Dataflow SDK version: 2.9.0

def state = result.state

Note that now sc.run() doesn’t block and wait until job completes and gives back control of the REPL right away. Use ScioExecutionContext to check for progress, results and orchestrate jobs.

Multiple Scio contexts

You can use multiple Scio context objects to work with several pipelines at the same time, simply use magic :newScio <context name>, for example:

scio> :newScio c1
Scio context available as 'c1'
scio> :newScio c2
Scio context available as 'c2'
scio> :newLocalScio lc
Scio context available as 'lc'

You can use those in combination with DataflowRunner to run multiple pipelines in the same session or wire them with for comprehension over futures.

BigQuery client

Whenever possible leverage BigQuery! @BigQueryType annotations enable type safe and civilized integration with BigQuery inside Scio. Here is example of using the annotations and BigQuery client to read and write typed data directly without Scio context.

$ java -jar -Dbigquery.project=<project-id> scio-repl-0.7.0.jar
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'
import com.spotify.scio.bigquery._
import com.spotify.scio.bigquery.client._

def bq: BigQuery = ???
@BigQueryType.fromQuery("SELECT tornado, month FROM [apache-beam-testing:samples.weather_stations]") class Row

def tornadoes = bq.getTypedRows[Row]()

def result = tornadoes.next().month

def write = bq.writeTypedRows("project-id:dataset-id.table-id", tornadoes.take(100).toList)

Out of memory exception

In case of OOM exceptions, like for example:

import com.spotify.scio._
import com.spotify.scio.io._

def sc: ScioContext = ???
def closedTap: ClosedTap[String] = ???

def result = sc.run().waitUntilDone().tap(closedTap).value.next()
// Exception in thread "main"
// Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"

simply increase the size of the heap - be reasonable about the amount of data and heap size though.

Example of REPL startup with 2GiB of heap size:

$ java -Xmx2g -jar scio-repl-0.7.0.jar
Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.7.0

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)

Type in expressions to have them evaluated.
Type :help for more information.

BigQuery client available as 'bq'
Scio context available as 'sc'
Runtime.getRuntime().maxMemory()
// res1: Long = 1908932608

What is the type of an expression?

Use the built in :t! :t displays the type of an expression without evaluating it. Example:

scio> :t sc.textFile("README").flatMap(_.split("[^a-zA-Z']+")).filter(_.nonEmpty).map(_.length)
com.spotify.scio.values.SCollection[Int]

Learn more about magic keywords via scio> :help