Redis

Scio provides support for Redis in the scio-redis artifact.

Batch read

Reading key-value pairs from redis for a specific key pattern is supported via redis:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._

val sc: ScioContext = ???
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keyPattern = "foo*"

val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPattern)

Lookups

Looking up specific keys from redis can be done with RedisDoFn:

source.parDo(
  new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) {
    override def request(value: String, client: Client)(implicit
      ec: ExecutionContext
    ): Future[(String, Option[String])] =
      client
        .request(p => p.get(value) :: Nil)
        .map { case r: List[String @unchecked] => (value, r.headOption) }
  }
)

Write

Writes to Redis require an SCollection of a subclass of RedisMutation. Writes work in both batch and streaming modes via saveAsRedis:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
import com.spotify.scio.redis.types._

val connectionOptions = RedisConnectionOptions("redisHost", 6379)

val keys: SCollection[String] = ???
keys.map(IncrBy(_, 1)).saveAsRedis(connectionOptions)