Redis

Scio provides support for Redis in the scio-redis artifact.

Batch read

Reading key-value pairs from redis for a specific key pattern is supported via redis:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._

val sc: ScioContext = ???
val connectionOptions = RedisConnectionOptions("redisHost", 6379)
val keyPattern = "foo*"

val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPattern)

Lookups

Looking up specific keys from redis can be done with RedisDoFn:

import com.spotify.scio._
import com.spotify.scio.redis._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.ParDo
import scala.concurrent.{ExecutionContext, Future}

val redisHost: String = ???
val redisPort: Int = ???
val batchSize: Int = ???
val connectionOptions = RedisConnectionOptions(redisHost, redisPort)

val keys: SCollection[String] = ???

keys
  .applyTransform(
    ParDo.of(
      new RedisDoFn[String, (String, Option[String])](connectionOptions, batchSize) {
        override def request(value: String, client: Client)(
          implicit ec: ExecutionContext
        ): Future[(String, Option[String])] =
          client
            .request(p => p.get(value) :: Nil)
            .map { case r: List[String @unchecked] => (value, r.headOption) }
      }
    )
  )

Write

Writes to Redis require an SCollection of a subclass of RedisMutation. Writes work in both batch and streaming modes via saveAsRedis:

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.redis._
import com.spotify.scio.redis.types._

val connectionOptions = RedisConnectionOptions("redisHost", 6379)

val keys: SCollection[String] = ???
keys.map(IncrBy(_, 1)).saveAsRedis(connectionOptions)