Datastore Entity is a Protobuf type and very verbose. By using Magnolify, one can seamlessly convert between case classes and Datastore Entity types.

package com.spotify.scio.examples.extra import com.google.datastore.v1.client.DatastoreHelper.makeKey import com.google.datastore.v1.Query import com.spotify.scio._ import com.spotify.scio.datastore._ import com.spotify.scio.examples.common.ExampleData import magnolify.datastore._ object MagnolifyDatastoreExample { val kind = "magnolify" 

Define case class representation of Datastore entities

case class WordCount(word: String, count: Long

DatastoreType provides mapping between case classes and Datatore entities

val wordCountType: EntityType[WordCount] = EntityType[WordCount] }  

Magnolify Datastore Write Example

Count words and save result to Datastore

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.MagnolifyDatastoreWriteExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=gs://apache-beam-samples/shakespeare/kinglear.txt --output=[PROJECT]"

object MagnolifyDatastoreWriteExample { def main(cmdlineArgs: Array[String]): Unit = { import MagnolifyDatastoreExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .map { t => 

Convert case class to Entity.Builder

wordCountType .to(WordCount.tupled(t)) 

Set entity key

.setKey(makeKey(kind, t._1)) .build() } .saveAsDatastore(args("output")) sc.run() () } }  

Magnolify Datastore Read Example

Read word count result back from Datastore

 

Usage:

 

sbt "runMain com.spotify.scio.examples.extra.MagnolifyDatastoreReadExample --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] --input=[PROJECT] --output=gs://[BUCKET]/[PATH]/wordcount"

object MagnolifyDatastoreReadExample { def main(cmdlineArgs: Array[String]): Unit = { import MagnolifyDatastoreExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) sc.datastore(args("input"), Query.getDefaultInstance

Convert Entity to case class

.map(e => wordCountType(e)) .map(wc => wc.word + ": " + wc.count) .saveAsTextFile(args("output")) sc.run() () } }