Managed IO
Beam’s Managed transforms move responsibility for the creation of transform classes from user code to the runner, allowing runner-specific optimizations like hot-swapping an instance of a transform with an updated one. Beam currently supports Iceberg and Kafka managed transforms. See also Dataflow’s supported transforms.
A Scio Coder must be defined for the Beam Row, derived from the Beam Schema expected from the datasource. If you have more than one type of data being read into Beam Rows, you will need to provide the coders explicitly instead of implicitly.
The source and sink parameters should be imported from Beam’s Managed.
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.managed._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.managed.Managed
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.values.Row
val sc: ScioContext = ???
val rowSchema: Schema = ???
implicit val rowCoder: Coder[Row] = Coder.row(rowSchema)
val config: Map[String, Object] = ???
val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, rowSchema, config)
Saving data to a Managed IO is similar:
import com.spotify.scio.managed._
import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.managed.Managed
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.values.Row
val rows: SCollection[Row] = ???
val config: Map[String, Object] = ???
rows.saveAsManaged(Managed.ICEBERG, config)
Magnolify’s RowType (available as part of the magnolify-beam artifact) provides automatically-derived mappings between Beam’s Row and scala case classes. See full documentation here.
import com.spotify.scio.ScioContext
import com.spotify.scio.managed._
import org.apache.beam.sdk.managed.Managed
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.values.Row
import magnolify.beam._
val config: Map[String, Object] = ???
case class Record(a: Int, b: String)
val rt = RowType[Record]
implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema)
val sc: ScioContext = ???
sc.managed(Managed.ICEBERG, rt.schema, config)
// convert the Row instance to a Record
.map(rt.apply)
.map(r => r.copy(a = r.a + 1))
// convert the Record to a Row
.map(rt.apply)
.saveAsManaged(Managed.ICEBERG, config)