Testing

To write Scio unit tests you will need to add the following dependency to your build.sbt

libraryDependencies ++= Seq(
  // .......
  "com.spotify" %% "scio-test-core" % scioVersion % Test,
  // .......
)

To run the test, you can run the following commands. You can skip the first two lines if you already ran them and are still in the sbt shell.

$ sbt
> project scio-examples
> test

Click on this link for more Scala testing tasks.

Test entire pipeline

We will use the WordCountTest to explain how Scio tests work. WordCount is the pipeline under test. Full example code for WordCountTest and other test examples can be found here.

Let’s walk through the details of the test: The test class should extend the PipelineSpec which is a trait for unit testing pipelines.

The inData variable holds the input data for your test and the expected variable contains the expected results after your pipeline processes the inData. The WordCount pipeline counts the occurrence of each word, the given the input data , we should expect a count of a=3, b=3, c=1 etc

 val inData = Seq("a b c d e", "a b a b", "")
 val expected = Seq("a: 3", "b: 3", "c: 1", "d: 1", "e: 1")

Using JobTest, you can test the entire pipeline. Specify the type of the class under test, in this case it is com.spotify.scio.examples.WordCount.type . The args function takes the list of command line arguments passed to the main function of WordCount. The WordCount’s main function expects input and output arguments passed to it.

source"WordCount" should "work" in {
  JobTest[com.spotify.scio.examples.WordCount.type]
    .args("--input=in.txt", "--output=out.txt")
    .input(TextIO("in.txt"), inData)
    .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(expected))
    .run()
}

The input function injects your input test data. Note that the TestIO[T] should match the input source used in the pipeline e.g. TextIO for sc.textFile, AvroIO for sc.avro. The TextIO id (“in.txt”) should match the one specified in the args.

The output function evaluates the output of the pipeline using the provided assertion from the SCollectionMatchers. More info on SCollectionMatchers can be found here. In this example, we are asserting that the output of the pipeline should contain an SCollection with elements that in the expected variable in any order. Also, note that the TestIO[T] should match the output used in the pipeline e.g. TextIO for sc.saveAsTextFile

The run function will run the pipeline.

Test for pipeline with sideinput

We will use the SideInputJoinExamples test in JoinExamplesTest to illustrate how to write a test for pipelines with sideinputs. The SideInputJoinExamples pipeline has two input sources, one for eventsInfo and the other for countryInfo. CountryInfo is used as a sideinput to join with eventInfo.

Since we have two input sources, we have to specify both in the JobTest. Note that the injected data type should match one expected by the sink.

source"SideInputJoinExamples" should "work" in {
  JobTest[com.spotify.scio.examples.cookbook.SideInputJoinExamples.type]
    .args("--output=out.txt")
    .input(BigQueryIO(ExampleData.EVENT_TABLE), eventData)
    .input(BigQueryIO(ExampleData.COUNTRY_TABLE), countryData)
    .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(expected))
    .run()
}

Test for pipeline with sideoutput

SideInOutExampleTest shows an example of how to test pipelines with sideoutputs. Each sideoutput is evaluated using the output function. The ids for TextIO e.g. “out1.txt” should match the ones specified in the args.

sourceval inData: Seq[String] = Seq("The quick brown fox jumps over the lazy dog.")

"SideInOutExample" should "work" in {
  JobTest[SideInOutExample.type]
    .args(
      "--input=in.txt",
      "--stopWords=stop.txt",
      "--output1=out1.txt",
      "--output2=out2.txt",
      "--output3=out3.txt",
      "--output4=out4.txt"
    )
    .input(TextIO("in.txt"), inData)
    .input(TextIO("stop.txt"), Seq("the"))
    .output(TextIO("out1.txt"))(coll => coll should containInAnyOrder(Seq.empty[String]))
    .output(TextIO("out2.txt"))(coll => coll should containInAnyOrder(Seq.empty[String]))
    .output(TextIO("out3.txt"))(coll => coll should containInAnyOrder(Seq("dog: 1", "fox: 1")))
    .output(TextIO("out4.txt")) {
      _ should containInAnyOrder(Seq("brown: 1", "jumps: 1", "lazy: 1", "over: 1", "quick: 1"))
    }
    .run()
}

Test partial pipeline

To test a section of a pipeline, use runWithContext. The TriggerExample.extractFlowInfo test in TriggerExampleTest tests only the extractFlowInfo part of the pipeline.

The data variable hold the test data and sc.parallelize will transform the input Iterable to an SCollection of strings. TriggerExample.extractFlowInfo will be executed using the ScioContext and you can then specify assertions against the result of the pipeline.

source"TriggerExample.extractFlowInfo" should "work" in {
  val data = Seq(
    "01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,"
      + "12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0",
    "01/01/2010 00:00:00,"
      + "1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"
  )
  runWithContext { sc =>
    val r = TriggerExample.extractFlowInfo(sc.parallelize(data))
    r should haveSize(1)
    r should containSingleValue(("94", 29))
  }
}

When your pipeline section contains input and/or output, you can also create an anonymous JobTest to inject the test data.

If we have the following pipeline section:

sourcedef pipeline(sc: ScioContext, input: String, output: String): Unit = {
  sc.textFile(input)
    .map(_ + "X")
    .saveAsTextFile(output)
}

It can be tested with:

sourceJobTest(pipeline(_, "in.txt", "out.txt"))
  .input(TextIO("in.txt"), Seq("a", "b", "c"))
  .output(TextIO("out.txt"))(_ should containInAnyOrder(Seq("aX", "bX", "cX")))
  .run()

Test for pipeline with windowing

We will use the LeaderBoardTest to explain how to test Windowing in Scio. The full example code is found here. LeaderBoardTest also extends PipelineSpec. The function under test is the LeaderBoard.calculateTeamScores. This function calculates teams scores within a fixed window with the following the window options:

  • Calculate the scores every time the window ends
  • Calculate an early/“speculative” result from partial data, 5 minutes after the first element in our window is processed (withEarlyFiring)
  • Accept late entries (and recalculates based on them) only if they arrive within the allowedLateness duration.

In this test, we are testing calculateTeamScores for when all of the elements arrive on time, i.e. before the watermark.

First, we have to create an input stream representing an unbounded SCollection of type GameActionInfo using the testStreamOf. Each element is assigned a timestamp representing when each event occurred. In the code snippet above, we start at epoch equal zero, by setting watermark to 0 in the advanceWatermarkTo.

We add GameActionInfo elements with varying timestamps, and we advanced the watermark to 3 minutes. At this point, all elements are on time because they came before the watermark advances to 3 minutes.

sourceval stream = testStreamOf[GameActionInfo]
  // Start at the epoch
  .advanceWatermarkTo(baseTime)
  // add some elements ahead of the watermark
  .addElements(
    event(blueOne, 3, Duration.standardSeconds(3)),
    event(blueOne, 2, Duration.standardMinutes(1)),
    event(redTwo, 3, Duration.standardSeconds(22)),
    event(blueTwo, 5, Duration.standardSeconds(3))
  )

We then more GameActionInfo elements and advance the watermark to infinity by calling the advanceWatermarkToInfinity. Similarly, these elements are also on time because the watermark is infinity.

source// The watermark advances slightly, but not past the end of the window
.advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3)))
.addElements(
  event(redOne, 1, Duration.standardMinutes(4)),
  event(blueOne, 2, Duration.standardSeconds(270))
)
// The window should close and emit an ON_TIME pane
.advanceWatermarkToInfinity

To run the test, we use the runWithContext, this will run calculateTeamScores using the ScioContext. In calculateTeamScores, we pass the SCollection we created above using testStreamOf. The IntervalWindow specifies the window for which we want to assert the SCollection of elements created by calculateTeamScores. We want to assert that elements with initial window of 0 to 20 minutes were on time. Next we assert, using inOnTimePane that the SCollection elements are equal to the expected sums.

sourcerunWithContext { sc =>
  val teamScores =
    LeaderBoard.calculateTeamScores(sc.testStream(stream), teamWindowDuration, allowedLateness)

  val window = new IntervalWindow(baseTime, teamWindowDuration)
  teamScores should inOnTimePane(window) {
    containInAnyOrder(Seq((blueOne.team, 12), (redOne.team, 4)))
  }
}

Scio provides more SCollection assertions such as inWindow, inCombinedNonLatePanes, inFinalPane, and inOnlyPane. You can find the full list here. More information on testing unbounded pipelines can be found here.

Test with transform overrides

Scio provides a method to replace arbitrary named PTransforms in a test context; this is primarily useful for mocking requests to external services.

In this example, the GuavaLookupDoFn stands in for a transform that contacts an external service. A ParDo PTransform is created from the DoFn (ParDo.of), then applied to the pipeline (applyTransform) with a unique name (myTransform).

sourcesc.textFile(args("input"))
  .map(_.toInt)
  .applyTransform("myTransform", ParDo.of(new GuavaLookupDoFn))
  .map((i: KV[Int, BaseAsyncLookupDoFn.Try[String]]) => i.getValue.get())
  .saveAsTextFile(args("output"))

In a JobTest, a PTransformOverride can be passed to the transformOverride method to replace transforms in the original pipeline. Scio provides convenience methods for constructing PTransformOverrides in the com.spotify.scio.testing.TransformOverride object. Continuing the example above, TransformOverride.ofAsyncLookup can be used to map static mock data into the expected output format for the transform, here KV[Int, BaseAsyncLookupDoFn.Try[String]].

source.transformOverride(
  TransformOverride.ofAsyncLookup[Int, String](
    "myTransform",
    Map(1 -> "10", 2 -> "20", 3 -> "30")
  )
)

It is also possible to provide a function rather than a static map:

sourceTransformOverride.ofAsyncLookup[Int, String](
  "myTransform",
  (i: Int) => s"${i * 10}"
)

In a scenario when the PTransform’s output is generating more elements than input, e.g. there is a flatmap inside the transform:

source.withName("myTransform")
.transform { c: SCollection[Int] =>
  c.applyTransform(ParDo.of(new GuavaLookupDoFn))
    .flatMap(_.getValue.get())
    .map(_.toString)
}

The transform can be mocked by one of the flavours of ofIter method to map each element to an Iterable[U]:

sourceTransformOverride.ofIter[Int, String](
  "myTransform",
  Map(1 -> Seq("10"), 2 -> Seq("20", "21"), 3 -> Seq())
)

or similarly provide a function rather than a static map:

sourceTransformOverride.ofIter[Int, String](
  "myTransform",
  // map fn equal to: Map(1 -> Seq(), 2 -> Seq("1"), 3 -> Seq("1", "2")}
  (i: Int) => { (1 until i).map(String.valueOf(_)) }
)

TransformOverride.of overrides transforms of type PTransform[PCollection[T], PCollection[U]] as in the case of BaseAsyncDoFn subclasses. TransformOverride.ofKV overrides transforms of type PTransform[PCollection[T], PCollection[KV[T, U]]].

Sources can also be overridden with TransformOverride.ofSource. For example, this source:

sourcesc.withName("ReadInput")
  .textFile(args("input"))

Can be overridden with static mock data:

sourceTransformOverride.ofSource[String]("ReadInput", List("10", "11", "12"))

It is alo possible to override a named PTransform during partial pipeline testing with runWithOverrides.

sourcerunWithOverrides(
  TransformOverride.of("multiply", (v: Int) => v * v),
  TransformOverride.ofIter("append", (v: Int) => Seq(v + "c", v + "d"))
) { sc =>
  val result = sc
    .parallelize(Seq(1, 2, 3))
    .withName("multiply")
    .map(_ * 2)
    .withName("append")
    .flatMap(v => Seq(v + "a", v + "b"))

  result should containInAnyOrder(Seq("1c", "1d", "4c", "4d", "9c", "9d"))
}

Due to type erasure it is possible to provide the incorrect types for the transform and the error will not be caught until runtime.

If you’ve specified the incorrect input type, scio will attempt to detect the error and throw an IllegalArgumentException, which will be wrapped in a PipelineExecutionException at runtime:

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
  java.lang.IllegalArgumentException:
    Input for override transform myTransform does not match pipeline transform. Expected: class java.lang.Integer Found: class java.lang.String

If you’ve specified the incorrect output type, there is little scio can do to detect the error. Typically, a coder will throw a ClassCastException whose message will contain the correct type:

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
  java.lang.ClassCastException:
    java.lang.String cannot be cast to java.lang.Integer