AsyncDoFn
Scio’s BaseAsyncDoFn
provides standard handling for sending asynchronous requests and capturing the responses for a bundle of pipeline elements. BaseAsyncDoFn
is a subclass of DoFnWithResource
which handles the creation and re-use of client classes. Scio provides several future-specific subclasses to choose from depending on the return type of the client:
GuavaAsyncDoFn
for clients that return Guava’sListenableFuture
JavaAsyncDoFn
for clients that returnCompletableFuture
ScalaAsyncDoFn
for clients that return a scalaFuture
BaseAsyncDoFn
will wait for all futures for all bundle elements to be returned before completing the bundle. A failure of any request for an item in the bundle will cause the entire bundle to be retried. Requests should therefore be idempotent.
Given this Guava-based mock client:
import com.google.common.util.concurrent.{ListenableFuture, Futures}
case class MyClient(value: String) {
def request(i: Int): ListenableFuture[String] = Futures.immediateFuture(s"$value$i")
}
For client which returns a ListenableFuture
, a custom DoFn
can be defined using GuavaAsyncDoFn
. Note the configured ResourceType
, which will re-use the client for all threads on a worker, see ResourceType
for more details.
import com.spotify.scio.transforms._
import com.spotify.scio.transforms.DoFnWithResource.ResourceType
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.ParDo
class MyDoFn() extends GuavaAsyncDoFn[Int, String, MyClient] {
override def getResourceType: ResourceType = ResourceType.PER_CLASS
override def createResource(): MyClient = MyClient("foo")
override def processElement(input: Int): ListenableFuture[String] =
getResource.request(input)
}
val elements: SCollection[Int] = ???
val result: SCollection[String] = elements.applyTransform(ParDo.of(new MyDoFn()))