FAQ
- General questions
- Programming questions
- How do I setup a new SBT project?
- How do I deploy Scio jobs to Dataflow?
- How do I use the SNAPSHOT builds of Scio?
- How do I unit test pipelines?
- How do I combine multiple input sources?
- How do I log in a job?
- How do I use Beam’s Java API in Scio?
- What are the different types of joins and performance implication?
- How to create Dataflow job template?
- How do I cancel a job after certain time period?
- Why can’t I have an SCollection inside another SCollection?
- BigQuery questions
- What is BigQuery dataset location?
- How stable is the type safe BigQuery API?
- How do I work with nested Options in type safe BigQuery?
- How do I unit test BigQuery queries?
- How do I stream to a partitioned BigQuery table?
- How do I invalidate cached BigQuery results or disable cache?
- How does BigQuery determine job priority?
- Streaming questions
- Other IO components
- Development environment issues
- Common issues
- What does “Cannot prove that T1 <:< T2” mean?
- How do I fix invalid default BigQuery credentials?
- Why are my typed BigQuery case classes not up to date?
- How do I fix “SocketTimeoutException” with BigQuery?
- Why do I see names like “main@{NativeMethodAccessorImpl...}” in the UI?
- How do I fix “RESOURCE_EXHAUSTED” error?
- Can I use “scala.App” trait instead of “main” method?
- How to inspect the content of an
SCollection
? - How do I improve side input performance?
- How do I control concurrency (number of DoFn threads) in Dataflow workers
- How to manually investigate a Cloud Dataflow worker
General questions
What’s the status of Scio?
Scio is widely being used for production data pipelines at Spotify and is our preferred framework for building new pipelines on Google Cloud. We run Scio on Google Cloud Dataflow service in both batch and streaming modes. It is still under development and there may be minor breaking API changes.
Who’s using Scio?
Spotify uses Scio for all new data pipelines running on Google Cloud Platform, including music recommendation, monetization, artist insights and business analysis. We also use BigQuery, Bigtable and Datastore heavily with Scio. We use Scio in both batch and streaming mode.
As of mid 2017, there are 200+ developers and 700+ production pipelines. The largest batch job we’ve seen uses 800 n1-highmem-32 workers (25600 CPUs, 166.4TB RAM) and processes 325 billion rows from Bigtable (240TB). We also have numerous jobs that process 10TB+ of BigQuery data daily. On the streaming front, we have many jobs with 30+ n1-standard-16 workers (480 CPUs, 1.8TB RAM) and SSD disks for real time machine learning or reporting.
What’s the relationship between Scio and Apache Beam?
Scio is a Scala API built on top of Apache Beam’s Java SDK. Scio offers a concise, idiomatic Scala API for a subset of Beam’s features, plus extras we find useful, like REPL, type safe BigQuery, and IO taps.
What’s the relationship between Scio and Google Cloud Dataflow?
Scio (version before 0.3.0) was originally built on top of Google Cloud Dataflow’s Java SDK. Google donated the code base to Apache and renamed it Beam. Cloud Dataflow became one of the supported runners, alongside Apache Flink & Apache Spark. Scio 0.3.x is built on top of Beam 0.6.0 and 0.4.x is built on top of Beam 2.x. Many users run Scio on the Dataflow runner today.
How does Scio compare to Scalding or Spark?
Check out the wiki page on Scio, Scalding and Spark. Also check out Big Data Rosetta Code for some snippets.
What are GCE availability zone and GCS bucket location?
- GCE availability zone is where the Google Cloud Dataflow service spins up VM instances for your job, e.g.
us-east1-a
. - Each GCS bucket (
gs://bucket
) has a storage class and bucket location that affects availability, latency and price. The location should be close to GCE availability zone. Dataflow uses--stagingLocation
for job jars, temporary files and BigQuery I/O.
Programming questions
How do I setup a new SBT project?
Read the documentation.
How do I deploy Scio jobs to Dataflow?
When developing locally, you can do sbt "runMain MyClass ...
or just runMain MyClass ...
in the SBT console without building any artifacts.
When deploying to the cloud, we recommend using sbt-pack or sbt-native-packager plugin instead of sbt-assembly. Unlike assembly, they pack dependency jars in a directory instead of merging them, so that we don’t have to deal with merge strategy and dependency jars can be cached by Dataflow service.
At Spotify we pack jars with sbt-pack, build docker images with sbt-docker together with orchestration components e.g. Luigi or Airflow and deploy them with Styx.
How do I use the SNAPSHOT builds of Scio?
Commits to Scio master are automatically published to Sonatype via continuous integration. To use the latest SNAPSHOT artifact, add the following line to your build.sbt
.
resolvers += Resolver.sonatypeRepo("snapshots")
Or you can configure SBT globally by adding the following to ~/.sbt/1.0/global.sbt
.
resolvers ++= Seq(
Resolver.sonatypeRepo("snapshots")
// other resolvers
)
How do I unit test pipelines?
Any Scala or Java unit testing frameworks can be used with Scio, but we provide some utilities for ScalaTest.
PipelineTestUtils
- utilities for testing parts of a pipelineJobTest
- for testing pipelines end-to-end with complete arguments and IO coverageSCollectionMatchers
- ScalaTest matchers forSCollection
PipelineSpec
- shortcut for ScalaTestFlatSpec
with utilities and matchers
The best place to find example usage of JobTest
and SCollectionMatchers
are their respective tests in JobTestTest and SCollectionMatchersTest. For more examples see:
- scio-examples
- https://github.com/spotify/big-data-rosetta-code/tree/master/src/test/scala/com/spotify/bdrc/testing
How do I combine multiple input sources?
How do I combine multiple input sources, e.g. different BigQuery tables, files located in different GCS buckets? You can combine SCollection
s from different sources into one using the companion method SCollection.unionAll
, for example:
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.values._
import com.spotify.scio.avro.TestRecord
object MyJob {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val collections = Seq(
"gs://bucket1/data/",
"gs://bucket2/data/"
).map(path => sc.avroFile[TestRecord](path, suffix=".avro"))
val all = SCollection.unionAll(collections)
}
}
How do I log in a job?
You can log in a Scio job with most common logging libraries but slf4j
is included as a dependency. Define the logger instance as a member of the job object
and use it inside a lambda.
import com.spotify.scio._
import org.slf4j.LoggerFactory
object MyJob {
private val logger = LoggerFactory.getLogger(this.getClass)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.parallelize(1 to 100)
.map { i =>
logger.info(s"Element $i")
i * i
}
// ...
}
}
How do I use Beam’s Java API in Scio?
Scio exposes a few things to allow easy integration with native Beam Java API, notably:
ScioContext#customInput
to apply aPTransform[_ >: PBegin, PCollection[T]]
(source) and get anSCollection[T]
.SCollection#applyTransform
to apply aPTransform[_ >: PCollection[T], PCollection[U]]
and get anSCollection[U]
SCollection#saveAsCustomOutput
to apply aPTransform[_ >: PCollection[T], PDone]
(sink) and get aClosedTap[T]
.
See BeamExample for more details. Custom I/O can also be tested via the JobTest
harness.
What are the different types of joins and performance implication?
- Inner (
a.join(b)
), left (a.leftOuterJoin(b)
), outer (a.fullOuterJoin(b)
) performs better with a large LHS. Soa
should be the larger data set with potentially more hot keys, i.e. key with many values. Every key-value pair from every input is shuffled. join
/leftOuterJoin
may be replaced byhashJoin
/leftHashJoin
if the RHS is small enough to fit in memory (e.g. < 1GB). The RHS is used as a multi-map side input for the LHS. No shuffle is performed.- Consider
skewedJoin
if some keys on the LHS are extremely hot. - Consider
sparseOuterJoin
if you want a full outer join where RHS is much smaller than LHS, but may not fit in memory. - Consider
cogroup
if you need to access value groups of each key. MultiJoin
supports inner, left, outer join and cogroup of up to 22 inputs.- For multi-joins larger inputs should be on the left, e.g.
size(a) >= size(b) >= size(c) >= size(d)
inMultiJoin(a, b, c, d)
. - Check out these slides for more information on joins.
- Also see this section on Cloud Dataflow Shuffle service.
How to create Dataflow job template?
For Apache Beam based Scio (version >= 0.3.0
) use DataflowRunner
and specify templateLocation
option. For example in CLI --templateLocation=gs://<bucket>/job1
. Read more about templates here.
How do I cancel a job after certain time period?
You can wait on the ScioResult
and call the internal PipelineResult#cancel()
method if a timeout exception happens.
import com.spotify.scio._
import scala.concurrent.duration._
object MyJob {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
// ...
val closedSc: ScioExecutionContext = sc.run()
val result: ScioResult = closedSc.waitUntilFinish(1.minute, cancelJob = true)
}
}
Why can’t I have an SCollection inside another SCollection?
You cannot have an SCollection inside another SCollection, i.e. anything with type SCollection[SCollection[T]]
. To explain this we have to go back to the relationship between ScioContext
and SCollection
. Every ScioContext
represents a unique pipeline and every SCollection
represents a stage in the pipeline execution, i.e. the state of the pipeline after some transforms has be applied. We start a pipeline code with val sc = ...
, create new SCollection
s with methods on sc
, e.g. sc.textFile
, and transform them with methods like .map
, .filter
, .join
. Therefore each SCollection
can trace its root to one single sc
. The pipeline is submitted for execution when we call sc.run()
. Hence we cannot have an SCollection
inside another SCollection
just as we cannot have a pipeline inside another pipeline.
BigQuery questions
What is BigQuery dataset location?
- Each BigQuery dataset has a location (e.g.
US
,EU
) and every table inside are stored in the same location. Tables in aJOIN
must be from the same region. Also one can only import/export tables to a GCS bucket in the same location. Starting from v0.2.1, Scio will detect the dataset location of a query and create a staging dataset forScioContext#bigQuerySelect
and@BigQueryType.fromQuery
. This location should be the same as that of your--stagingLocation
GCS bucket. The old-Dbigquery.staging_dataset.location
flag is removed.
Because of these limitations and performance reasons, make sure --zone
, --stagingLocation
and location of BigQuery datasets are consistent.-Dbigquery.staging_dataset.location
How stable is the type safe BigQuery API?
BigQuery API is considered stable and widely used at Spotify. There are several caveats however:
- Both legacy and SQL syntax are supported although the SQL syntax is highly recommended
- The system will detect legacy or SQL syntax and choose the correct one
- To override auto-detection, start the query with either
#legacysql
or#standardsql
comment line - Legacy syntax is less predictable, especially for complex queries and may be disabled in the future
- Case classes generated by
@BigQueryType.fromTable
or@BigQueryType.fromQuery
are not recognized in IntelliJ IDEA, but see this section for a workaround
How do I work with nested Options in type safe BigQuery?
Any nullable field in BigQuery is translated to Option[T]
by the type safe BigQuery API and it can be clunky to work with rows with multiple or nested fields. For example:
def doSomething(s: String): Unit = ()
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromSchema("""{
|"fields": [{
| "type":"RECORD",
| "mode": "NULLABLE",
| "name":"user",
| "fields":[
| {"mode": "NULLABLE", "name":"email", "type": "STRING"},
| {"mode": "REQUIRED","name":"name","type":"STRING"}]
|}]}""".stripMargin)
class Row
def doSomethingWithRow(row: Row) = {
if (row.user.isDefined) { // Option[User]
val email = row.user.get.email // Option[String]
if (email.isDefined) {
doSomething(email.get)
}
}
}
For comprehension is a nicer alternative in these cases:
def doSomethingWithRowUsingFor(row: Row) = {
val e: Option[String] =
for {
u <- row.user
e <- u.email
} yield e
e.foreach(doSomething)
}
Also see these slides and this blog article.
How do I unit test BigQuery queries?
BigQuery doesn’t provide a way to unit test query logic locally, but we can query the service directly in an integration test. Take a look at BigQueryIT.scala. MockBigQuery
will create temporary tables on the service, feed them with mock data, and substitute table references in your query string with the mocked ones.
How do I stream to a partitioned BigQuery table?
Currently, there is no way to create a partitioned BigQuery table via Scio/Beam when streaming, however it is possible to stream to a partitioned table if it is already created.
This can be done by using fixed windows and using the window bounds to infer date. As of Scio 0.4.0-beta2 this looks as follows:
import com.spotify.scio._
import com.spotify.scio.pubsub._
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.IntervalWindow
import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, TableDestination}
import BigQueryIO.Write.{CreateDisposition, WriteDisposition}
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTimeZone, Duration}
class DayPartitionFunction() extends SerializableFunction[ValueInSingleWindow[TableRow], TableDestination] {
override def apply(input: ValueInSingleWindow[TableRow]): TableDestination = {
val partition = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC)
.print(input.getWindow.asInstanceOf[IntervalWindow].start())
new TableDestination("project:dataset.partitioned$" + partition, "")
}
}
object BQPartitionedJob {
def myStringToTableRowConversion: String => TableRow = ???
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.read(PubsubIO.string("projects/data-university/topics/data-university"))(PubsubIO.ReadParam(PubsubIO.Subscription))
.withFixedWindows(Duration.standardSeconds(30))
// Convert to `TableRow`
.map(myStringToTableRowConversion)
.saveAsCustomOutput(
"SaveAsDayPartitionedBigQuery",
BigQueryIO.writeTableRows().to(
new DayPartitionFunction())
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
)
sc.run()
}
}
In Scio 0.3.X it is possible to achieve the same behaviour using SerializableFunction[BoundedWindow, String]
and BigQueryIO.Write.to
. It is also possible to stream to separate tables with a Date suffix by modifying DayPartitionFunction
, specifying the Schema, and changing the CreateDisposition to CreateDisposition.CREATE_IF_NEEDED
.
How do I invalidate cached BigQuery results or disable cache?
Scio’s BigQuery client
in Scio caches query result in system property bigquery.cache.directory
, which defaults to $PWD/.bigquery
. Use rm -rf .bigquery
to invalidate all cached results. To disable caching, set system property bigquery.cache.enabled
to false
.
How does BigQuery determine job priority?
By default, Scio runs BigQuery jobs with BATCH
priority except when in the REPL where it runs with INTERACTIVE
. To override this, set system property bigquery.priority
to either BATCH
or INTERACTIVE
.
Streaming questions
How do I update a streaming job?
Dataflow allows streaming jobs to be updated on the fly by specifying --update
, along with --jobName=[your_job]
on the command line. See https://cloud.google.com/dataflow/pipelines/updating-a-pipeline for detailed docs. Note that for this to work, Dataflow needs to be able to identify which transformations from the original job map to those in the replacement job. The easiest way to do so is to give unique names to transforms in the code itself. In Scio, this can be achieved by calling .withName()
before applying the transform. For example:
import com.spotify.scio._
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.textFile(args("input"))
.withName("MakeUpper").map(_.toUpperCase)
.withName("BigWords").filter(_.length > 6)
}
In this example, the map
’s transform name is “MakeUpper” and the filter
’s is “BigWords”. If we later decided that we want to count 6 letter words as “big” too, then we can change it to _.length > 5
, and because the transform name is the same the job can be updated on the fly.
Other IO components
How do I access various files outside of a ScioContext?
- For Scio version >=
0.4.0
Starting from Scio 0.4.0
you can use Apache Beam’s Filesystems
abstraction:
import org.apache.beam.sdk.io.FileSystems
// the path can be any of the supported Filesystems, e.g. local, GCS, HDFS
def readmeResource = FileSystems.matchNewResource("gs://<bucket>/README.md", false)
def readme = FileSystems.open(readmeResource)
- For Scio version <
0.4.0
This part is GCS specific.
You can get a GcsUtil
instance from ScioContext
, which can be used to open GCS files in read or write mode.
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val gcsUtil = sc.optionsAs[GcsOptions].getGcsUtil
// ...
}
How do I reduce Datastore boilerplate?
Datastore Entity
class is actually generated from Protobuf which uses the builder pattern and very boilerplate heavy. You can use the Magnolify library to seamlessly convert between case classes and Entity
s. See MagnolifyDatastoreExample.scala for an example job and MagnolifyDatastoreExampleTest.scala for tests.
How do I throttle Bigtable writes?
Currently, Dataflow autoscaling may not work well with large writes BigtableIO. Specifically It does not take into account Bigtable IO rate limits and may scale up more workers and end up hitting the limit and eventually fail the job. As a workaround, you can enable throttling for Bigtable writes in Scio 0.4.0-alpha2 or later.
val btProjectId = ""
val btInstanceId = ""
val btTableId = ""
import com.spotify.scio.values._
import com.spotify.scio.bigtable._
import com.google.cloud.bigtable.config.{BigtableOptions, BulkOptions}
import com.google.bigtable.v2.Mutation
import com.google.protobuf.ByteString
def main(cmdlineArgs: Array[String]): Unit = {
// ...
val data: SCollection[(ByteString, Iterable[Mutation])] = ???
val btOptions =
BigtableOptions.builder()
.setProjectId(btProjectId)
.setInstanceId(btInstanceId)
.setBulkOptions(BulkOptions.builder()
.enableBulkMutationThrottling()
.setBulkMutationRpcTargetMs(10) // lower latency threshold, default is 100
.build())
.build()
data.saveAsBigtable(btOptions, btTableId)
// ...
}
How do I use custom Kryo serializers?
See Kryo for more.
Define a registrar class that extends IKryoRegistrar
and annotate it with @KryoRegistrar
. Note that the class name must ends with KryoRegistrar
, i.e. MyKryoRegistrar
for Scio to find it.
trait UserRecord
trait AccountRecord
import com.twitter.chill.KSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
class UserRecordSerializer extends KSerializer[UserRecord] {
def read(x$1: Kryo, x$2: Input, x$3: Class[UserRecord]): UserRecord = ???
def write(x$1: Kryo, x$2: Output, x$3: UserRecord): Unit = ???
}
class AccountRecordSerializer extends KSerializer[AccountRecord] {
def read(x$1: Kryo, x$2: Input, x$3: Class[AccountRecord]): AccountRecord = ???
def write(x$1: Kryo, x$2: Output, x$3: AccountRecord): Unit = ???
}
import com.twitter.chill._
import com.esotericsoftware.kryo.Kryo
import com.spotify.scio.coders.KryoRegistrar
import com.twitter.chill.IKryoRegistrar
@KryoRegistrar
class MyKryoRegistrar extends IKryoRegistrar {
override def apply(k: Kryo): Unit = {
// register serializers for additional classes here
k.forClass(new UserRecordSerializer)
k.forClass(new AccountRecordSerializer)
//...
}
}
Registering just the classes can also improve Kryo performance. By registering, classes will be serialized as numeric IDs instead of fully qualified class names, hence saving space and network IO while shuffling. make
trait MyRecord1
trait MyRecord2
import com.twitter.chill._
import com.esotericsoftware.kryo.Kryo
import com.spotify.scio.coders.KryoRegistrar
import com.twitter.chill.IKryoRegistrar
@KryoRegistrar
class MyKryoRegistrar extends IKryoRegistrar {
override def apply(k: Kryo): Unit = {
k.registerClasses(List(classOf[MyRecord1], classOf[MyRecord2]))
}
}
What Kryo tuning options are there?
See KryoOptions.java for a complete list of available Kryo tuning options. These can be passed via command line, for example:
--kryoBufferSize=1024 --kryoMaxBufferSize=8192 --kryoReferenceTracking=false --kryoRegistrationRequired=true
Among these, --kryoRegistrationRequired=true
might be useful when developing to ensure that all data types in the pipeline are registered.
Development environment issues
How do I keep SBT from running out of memory?
SBT might run out of memory sometimes and show an OutOfMemoryError: Metaspace
error. Override default memory setting with -mem <integer>
, e.g. sbt -mem 1024
.
How do I fix SBT heap size error in IntelliJ?
If you encounter an SBT error with message “Initial heap size set to a larger value than the maximum heap size”, that is because IntelliJ has a lower default -Xmx
for SBT than -Xms
in our .jvmopts
. To fix that, open Preferences
-> Build, Execution, Deployment
-> Build Tools
-> sbt
, and update Maximum heap size, MB
to 2048
.
How do I fix “Unable to create parent directories” error in IntelliJ?
You might get an error message like java.io.IOException: Unable to create parent directories of /Applications/IntelliJ IDEA CE.app/Contents/bin/.bigquery/012345abcdef.schema.json
. This usually happens to people who run IntelliJ IDEA with its bundled JVM. There are two solutions.
- Install JDK from java.com and switch to it by following the “All platforms: switch between installed runtimes” section in this page.
- Override the bigquery
.cache
directory as a JVM compiler parameter. On the bottom right of the IntelliJ window, click the icon that looks like a clock, and then “Configure…”. Then, edit the JVM parameters to include the line-Dbigquery.cache.directory=</path/to/repository>/.bigquery
. Then, restart the compile server by clicking on the clock icon -> Stop, and then Start.
How to make IntelliJ IDEA work with type safe BigQuery classes?
Due to issue SCL-8834 case classes generated by @BigQueryType.fromTable
or @BigQueryType.fromQuery
are not recognized in IntelliJ IDEA. There are two workarounds. The first, IDEA plugin solution, is highly recommended.
- IDEA Plugin
Inside IntelliJ, Preferences
-> Plugins
-> Browse repositories ...
and search Scio
. Install the plugin, restart IntelliJ, recompile the project (use SBT or IntelliJ). You have to recompile the project each time you add/edit @BigQueryType
macro. Plugin requires Scio >= 0.2.2
. Documentation.
- Use case class from
@BigQueryType.toTable
First start Scio REPL and generate case classes from your query or table.
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.fromQuery("SELECT tornado, month FROM [bigquery-public-data:samples.gsod]")
class Tornado
Next print Scala code of the generated classes.
Tornado.toPrettyString()
You can then paste the @BigQueryType.fromQuery
code into your pipeline and use it with sc.typedBigQuery
.
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.bigquery._
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val data: SCollection[Tornado] = sc.typedBigQuery[Tornado]()
// ...
}
Common issues
What does “Cannot prove that T1 <:< T2” mean?
Sometimes you get an error message like Cannot prove that T1 <:< T2
when saving an SCollection
. This is because some sink methods have an implicit argument like this which means element type T
of SCollection[T]
must be a subtype of TableRow
in order to save it to BigQuery. You have to map out elements to the required type before saving.
def saveAsBigQuery(tableSpec: String)(implicit ev: T <:< TableRow)
In the case of saveAsTypedBigQuery
you might get an Cannot prove that T <:< com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation.
error message. This API requires an SCollection[T]
where T
is a case class annotated with @BigQueryType.toTable
. For example:
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.bigquery._
import com.spotify.scio.bigquery.types.BigQueryType
@BigQueryType.toTable
case class Result(user: String, score: Int)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val p: SCollection[(String, Int)] = ???
p.map(kv => Result(kv._1, kv._2))
.saveAsTypedBigQueryTable(Table.Spec(args("output")))
}
Scio uses Macro Annotations and Macro Paradise plugin to implement annotations. You need to add Macro Paradise plugin to your scala compiler as described here.
How do I fix invalid default BigQuery credentials?
If you don’t specify a secret credential file for BigQuery [1]
, Scio will use your default credentials (via GoogleCredential.getApplicationDefault), which:
Returns the Application Default Credentials which are used to identify and authorize the whole application. The following are searched (in order) to find the Application Default Credentials: - Credentials file pointed to by the
GOOGLE_APPLICATION_CREDENTIALS
environment variable - Credentials provided by the Google Cloud SDK -gcloud auth application-default login
command - Google App Engine built-in credentials - Google Cloud Shell built-in credentials - Google Compute Engine built-in credentials
The easiest way to configure it on your local machine is to use the gcloud auth application-default login
command.
[1]
Keep in mind that you can specify your credential file via -Dbigquery.secret
.
Why are my typed BigQuery case classes not up to date?
Case classes generated by @BigQueryType.fromTable
or other macros might not update after table schema change. To solve this problem, remove the cached BigQuery metadata by deleting the .bigquery
directory in your project root. If you would rather avoid any issues resulting from caching and schema evolution entirely, you can disable caching by setting the system property bigquery.cache.enabled
to false
.
How do I fix “SocketTimeoutException” with BigQuery?
BigQuery requests may sometimes timeout, i.e. for complex queries over many tables.
exception during macro expansion:
[error] java.net.SocketTimeoutException: Read timed out
It can be fixed by increasing the timeout settings (default 20s).
sbt -Dbigquery.connect_timeout=30000 -Dbigquery.read_timeout=30000
Why do I see names like “main@{NativeMethodAccessorImpl...}” in the UI?
Scio traverses JVM stack trace to figure out the proper name of each transform, i.e. flatMap@{UserAnalysis.scala:30}
but may get confused if your jobs are under the com.spotify.scio
package. Move them to a different package, e.g. com.spotify.analytics
to fix the issue.
How do I fix “RESOURCE_EXHAUSTED” error?
You might see errors like RESOURCE_EXHAUSTED: IO error: No space left on disk
in a job. They usually indicate that you have allocated insufficient local disk space to process your job. If you are running your job with default settings, your job is running on 3 workers, each with 250 GB of local disk space. Consider modifying the default settings to increase the number of workers available to your job (via --numWorkers
), to increase the default disk size per worker (via --diskSizeGb
).
Can I use “scala.App” trait instead of “main” method?
Your Scio applications should define a main
method instead of extending scala.App
. Applications extending scala.App
due to delayed initialization and closure cleaning may not work properly.
How to inspect the content of an SCollection
?
There are multiple options here:
- Use
debug()
method on anSCollection
to print its content as the data flows through the DAG during the execution (after therun
orrunAndCollect
) - Use a debugger and setup break points - make sure to break inside of your functions to stop control at the execution not the pipeline construction time
- In Scio-REPL, use
runAndCollect()
to execute the pipeline and materialize the contents of anSCollection
How do I improve side input performance?
By default, Dataflow workers allocate 100MB (see DataflowWorkerHarnessOptions#getWorkerCacheMb
) of memory for caching side inputs, and falls back to disk or network. Therefore jobs with large side inputs may be slow. To override this default, register DataflowWorkerHarnessOptions
before parsing command line arguments and then pass --workerCacheMb=N
when submitting the job.
import com.spotify.scio._
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions
def main(cmdlineArgs: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[DataflowWorkerHarnessOptions])
val (sc, args) = ContextAndArgs(cmdlineArgs)
// ...
}
How do I control concurrency (number of DoFn threads) in Dataflow workers
By default, Google Cloud Dataflow will use as many threads (concurrent DoFns) per worker as appropriate (precise definition is an implementation detail), in some cases you might want to control this. Use NumberOfWorkerHarnessThreads
option from DataflowPipelineDebugOptions
. For example to use a single thread per worker on 8 vCPU machine, simply specify 8 vCPU worker machine type, and --numberOfWorkerHarnessThreads=1
in CLI or set corresponding option in DataflowPipelineDebugOptions
.
How to manually investigate a Cloud Dataflow worker
First find the VM of the worker, the easiest place is through the GCE instance groups:
gcloud compute ssh --project=<project> --zone=<zone> <VM>
To find the id of batch (for batch
job) container:
docker ps | grep "batch\|streaming" | awk '{print $1}'
To get into the harness container:
docker exec -it <container-id> /bin/bash
To install java jdk tools:
apt-get update
apt-get install default-jdk -y
To find java process:
jps
To get GC stats:
jstat -gcutil <pid> 1000 1000
To get stacktrace:
jstack <pid>