Prerequisitos
Tener configurado la paquetería de Spark para IntelliJ IDEA
Incluir en el fichero pon la paquetería propia de Elastic:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency>
Ejemplo de como escribir un dataframe transformado a Mapa en un Elastic 6.4.2 local en Scala 2.1.1.
package com.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import scala.util.Random
object App {
def main(args : Array[String]) {
val conf = new SparkConf()
.setAppName("App")
.setMaster("local")
.set("es.nodes", "localhost")
.set("es.port", "9200")
.set("es.scheme", "http")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
// Create a RDD
val rdd = spark.sparkContext.parallelize(
Seq.fill(10000) {
(Math.abs(Random.nextLong % 100L),
Math.abs(Random.nextLong % 1000000000000L))
})
import spark.implicits._
val df = rdd.toDF("duration", "timestamp").cache()
df.show()
val mapa = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*))
sc.makeRDD(mapa).saveToEs("elastic_df/docs")
}
}Comprobar datos escritos en Elastic
Mostrar todos los indices almacenados en Elastic
http://localhost:9200/_cat/indices?v
Mostrar datos del índice “elastic_df” en concreto
http://localhost:9200/elastic_df/_search?pretty=true&q=*
Si quisieramos borrar el indice generado, tendríamos que ir a la linea de comandos y ejecutar:
curl -XDELETE 'https://localhost:9200/elastic_df'




0 comentarios