Redis
Scio provides support for Redis in the scio-redis
artifact.
Batch read
Reading key-value pairs from redis for a specific key pattern is supported via redis
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
val sc: ScioContext = ???
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keyPattern = "foo*"
val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPattern)
Lookups
Looking up specific keys from redis can be done with RedisDoFn
:
source.parDo(
new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) {
override def request(value: String, client: Client)(implicit
ec: ExecutionContext
): Future[(String, Option[String])] =
client
.request(p => p.get(value) :: Nil)
.map { case r: List[String @unchecked] => (value, r.headOption) }
}
)
Write
Writes to Redis require an SCollection
of a subclass of RedisMutation
. Writes work in both batch and streaming modes via saveAsRedis
:
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
import com.spotify.scio.redis.types._
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keys: SCollection[String] = ???
keys.map(IncrBy(_, 1)).saveAsRedis(connectionOptions)
0.14.3-36-9cdce42-20240418T151636Z*