package com.spotify.scio.examples.extra import com.google.bigtable.v2.{Mutation, Row} import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.bigtable._ import com.spotify.scio.examples.common.ExampleData import org.joda.time.Duration  

This depends on APIs from scio-bigtable and imports from com.spotify.scio.bigtable._.

object BigtableExample { val FAMILY_NAME: String = "count" val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long")  

Convert a key-value pair to a Bigtable Mutation for writing

def toMutation(key: String, value: Long): (ByteString, Iterable[Mutation]) = { val m = Mutations.newSetCell( FAMILY_NAME, COLUMN_QUALIFIER, ByteString.copyFromUtf8(value.toString), 0L ) (ByteString.copyFromUtf8(key), Iterable(m)) }  

Convert a Bigtable Row from reading to a formatted key-value string

def fromRow(r: Row): String = r.getKey.toStringUtf8 + ": " + r .getValue(FAMILY_NAME, COLUMN_QUALIFIER) .get .toStringUtf8 }  

Bigtable Write example

Count words and save result to Bigtable

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.BigtableWriteExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --bigtableProjectId=[BIG_TABLE_PROJECT_ID] --bigtableInstanceId=[BIG_TABLE_INSTANCE_ID] --bigtableTableId=[BIG_TABLE_TABLE_ID]"

object BigtableWriteExample { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) val btProjectId = args("bigtableProjectId") val btInstanceId = args("bigtableInstanceId") val btTableId = args("bigtableTableId")  

Bump up the number of bigtable nodes before writing so that the extra traffic does not affect production service. A sleep period is inserted to ensure all new nodes are online before the ingestion starts.

sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15)  

Ensure that destination tables and column families exist

sc.ensureTables( btProjectId, btInstanceId, Map( btTableId -> List(BigtableExample.FAMILY_NAME) ) ) sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .map(kv => BigtableExample.toMutation(kv._1, kv._2)) .saveAsBigtable(btProjectId, btInstanceId, btTableId) sc.run().waitUntilDone()  

Bring down the number of nodes after the job ends to save cost. There is no need to wait after bumping the nodes down.

sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 3, Duration.ZERO) } }  

Bigtable Read example

Read word count result back from Bigtable

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.BigtableReadExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --bigtableProjectId=[BIG_TABLE_PROJECT_ID] --bigtableInstanceId=[BIG_TABLE_INSTANCE_ID] --bigtableTableId=[BIG_TABLE_TABLE_ID] --output=gs://[BUCKET]/[PATH]/wordcount"

object BigtableReadExample { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) val btProjectId = args("bigtableProjectId") val btInstanceId = args("bigtableInstanceId") val btTableId = args("bigtableTableId") sc.bigtable(btProjectId, btInstanceId, btTableId) .map(BigtableExample.fromRow) .saveAsTextFile(args("output")) sc.run() () } }