Escribir dataframe en Elastic con Scala

Prerequisitos

Tener configurado la paquetería de Spark para IntelliJ IDEA

Incluir en el fichero pon la paquetería propia de Elastic:

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.11</artifactId>
  <version>6.4.2</version>
</dependency>

 

Ejemplo de como escribir un dataframe transformado a Mapa en un Elastic 6.4.2 local en Scala 2.1.1.

package com.scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import scala.util.Random

object App {

  def main(args : Array[String]) {

    val conf = new SparkConf()
      .setAppName("App")
      .setMaster("local")
      .set("es.nodes", "localhost")
      .set("es.port", "9200")
      .set("es.scheme", "http")


    val sc = new SparkContext(conf)

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    // Create a RDD
    val rdd = spark.sparkContext.parallelize(
      Seq.fill(10000) {
        (Math.abs(Random.nextLong % 100L),
          Math.abs(Random.nextLong % 1000000000000L))
      })

    import spark.implicits._
    val df = rdd.toDF("duration", "timestamp").cache()

    df.show()
    
    val mapa = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*))
    sc.makeRDD(mapa).saveToEs("elastic_df/docs")
  }
}

Comprobar datos escritos en Elastic

Mostrar todos los indices almacenados en Elastic

http://localhost:9200/_cat/indices?v

Mostrar datos del índice “elastic_df” en concreto

http://localhost:9200/elastic_df/_search?pretty=true&q=*

Si quisieramos borrar el indice generado, tendríamos que ir a la linea de comandos y ejecutar:

curl -XDELETE 'https://localhost:9200/elastic_df'

Otros artículos que pueden ser de interés:

Autor: Diego Calvo