Datastore Entity is a Protobuf type and very verbose. By using Magnolify, one can seamlessly convert between case classes and Datastore Entity types.

package com.spotify.scio.examples.extra import com.google.datastore.v1.Query import com.spotify.scio._ import com.spotify.scio.datastore._ import com.spotify.scio.examples.common.ExampleData object MagnolifyDatastoreExample

Define case class representation of Datastore entities

case class WordCount(word: String, count: Long) }  

Magnolify Datastore Write Example

Count words and save result to Datastore

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.MagnolifyDatastoreWriteExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=[PROJECT]"

object MagnolifyDatastoreWriteExample { def main(cmdlineArgs: Array[String]): Unit = { import MagnolifyDatastoreExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .map { case (word, count) => WordCount(word, count) } .saveAsDatastore(args("output")) sc.run() () } }  

Magnolify Datastore Read Example

Read word count result back from Datastore

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.MagnolifyDatastoreReadExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=[PROJECT] --output=gs://[BUCKET]/[PATH]/wordcount"

object MagnolifyDatastoreReadExample { def main(cmdlineArgs: Array[String]): Unit = { import MagnolifyDatastoreExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) sc.typedDatastore[WordCount](args("input"), Query.getDefaultInstance) .map(wc => wc.word + ": " + wc.count) .saveAsTextFile(args("output")) sc.run() () } }