Redis
Scio provides support for Redis in the scio-redis
artifact.
Batch read
Reading key-value pairs from redis for a specific key pattern is supported via redis
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
val sc: ScioContext = ???
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keyPattern = "foo*"
val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPattern)
Lookups
Looking up specific keys from redis can be done with RedisDoFn
:
import com.spotify.scio._
import com.spotify.scio.redis._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.ParDo
import scala.concurrent.{ExecutionContext, Future}
val redisHost: String = ???
val redisPort: Int = ???
val batchSize: Int = ???
val connectionOptions = RedisConnectionOptions(redisHost, redisPort)
val keys: SCollection[String] = ???
keys
.applyTransform(
ParDo.of(
new RedisDoFn[String, (String, Option[String])](connectionOptions, batchSize) {
override def request(value: String, client: Client)(
implicit ec: ExecutionContext
): Future[(String, Option[String])] =
client
.request(p => p.get(value) :: Nil)
.map { case r: List[String @unchecked] => (value, r.headOption) }
}
)
)
Write
Writes to Redis require an SCollection
of a subclass of RedisMutation
. Writes work in both batch and streaming modes via saveAsRedis
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
import com.spotify.scio.redis.types._
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keys: SCollection[String] = ???
keys.map(IncrBy(_, 1)).saveAsRedis(connectionOptions)
0.14.8-23-c45685a-20241105T161920Z*