Elasticsearch
Scio supports writing to Elasticsearch.
Writes
An SCollection
of arbitrary elements can be saved to Elasticsearch with saveAsElasticsearch
. The ElasticsearchOptions
-typed esOptions
argument requires a mapperFactory
argument capable of mapping the element type to json. saveAsElasticsearch
takes a second argument list, whose single argument f
can be provided as a block, and which maps the input type to Elasticsearch BulkOperations.
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.elasticsearch._
import co.elastic.clients.elasticsearch.core.bulk.{BulkOperation, IndexOperation}
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import org.apache.http.HttpHost
import java.time.LocalDate
val host: String = ???
val port: Int = ???
val esIndex: String = ???
case class Document(user: String, postDate: LocalDate, word: String, count: Long)
val primaryESHost = new HttpHost(host, port)
val mapperFactory = () => {
val mapper = new JacksonJsonpMapper()
mapper.objectMapper().registerModule(DefaultScalaModule)
mapper.objectMapper().registerModule(new JavaTimeModule())
mapper
}
val esOptions = ElasticsearchOptions(
nodes = Seq(primaryESHost),
mapperFactory = mapperFactory
)
val elements: SCollection[Document] = ???
elements.saveAsElasticsearch(esOptions) { d =>
List(
BulkOperation.of { bulkBuilder =>
bulkBuilder.index(
IndexOperation.of[Document] { indexBuilder =>
indexBuilder
.index(esIndex)
.document(d)
}
)
}
)
}
0.14.8-23-c45685a-20241105T161920Z*