PubSub
Scio supports Google Cloud PubSub
Read from PubSub
Use the appropriate PubsubIO
method with ScioContext.read
to read into strings, avro, protobuf, beam’s PubsubMessage
, or into any type supported by a scio Coder
. Pass a PubsubIO.ReadParam
to configure whether reading from a topic or subscription.
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.pubsub._
val a: PubsubIO[String] = PubsubIO.string("strings")
import com.spotify.scio.avro.Account
val b: PubsubIO[Account] = PubsubIO.avro[Account]("avros")
import com.spotify.scio.proto.Track.TrackPB
val c: PubsubIO[TrackPB] = PubsubIO.proto[TrackPB]("protos")
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
val d: PubsubIO[PubsubMessage] = PubsubIO.pubsub[PubsubMessage]("messages")
case class MyClass(s: String, i: Int)
val e: PubsubIO[MyClass] = PubsubIO.coder[MyClass]("myclasses")
val sc: ScioContext = ???
// read strings from a subscription
val in1: SCollection[String] = sc.read(a)(PubsubIO.ReadParam(PubsubIO.Subscription))
// or from a topic
val in2: SCollection[String] = sc.read(a)(PubsubIO.ReadParam(PubsubIO.Topic))
The withAttributes
methods give access to the PubSub attributes within the SCollection:
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.pubsub._
val sc: ScioContext = ???
val in: SCollection[(String, Map[String, String])] =
sc.read(PubsubIO.withAttributes[String]("strings"))(PubsubIO.ReadParam(PubsubIO.Subscription))
.map { case (element, attributes) =>
attributes.get("name")
???
}
Write to PubSub
PubSub write methods use the same PubSubIO methods as reading:
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.pubsub._
val strings: SCollection[String] = ???
strings.write(PubsubIO.string("strings"))(PubsubIO.WriteParam())
import com.spotify.scio.avro.Account
val accounts: SCollection[Account] = ???
accounts.write(PubsubIO.avro[Account]("accounts"))(PubsubIO.WriteParam())
import com.spotify.scio.proto.Track.TrackPB
val tracks: SCollection[TrackPB] = ???
tracks.write(PubsubIO.proto[TrackPB]("tracks"))(PubsubIO.WriteParam())
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
val messages: SCollection[PubsubMessage] = ???
messages.write(PubsubIO.pubsub[PubsubMessage]("messages"))(PubsubIO.WriteParam())
case class MyClass(s: String, i: Int)
val myClasses: SCollection[MyClass] = ???
myClasses.write(PubsubIO.coder[MyClass]("myClasses"))(PubsubIO.WriteParam())
Writing attributes:
import com.spotify.scio._
import com.spotify.scio.values._
import com.spotify.scio.pubsub._
val strings: SCollection[(String, Map[String, String])] = ???
strings.write(PubsubIO.withAttributes[String]("strings"))(PubsubIO.WriteParam())
0.14.8-23-c45685a-20241105T161920Z*