Escribir en Elastic remoto con Scala y seguridad Searchguard

Prerequisitos

Tener configurado la paquetería de Spark para IntelliJ IDEA

Tener un Elastic con Searchguard instalado

Incluir en el fichero pon la paquetería propia de Elastic:

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.11</artifactId>
  <version>6.4.2</version>
</dependency>

 

Ejemplo de como escribir un dataframe transformado a Mapa en un Elastic 5.2.0 local en Scala 2.1.1.

package com.scala

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.apache.spark.sql.SparkSession
import scala.util.Random

object App {

  def main(args : Array[String]) {

    val conf = new SparkConf()
      .setAppName("ElasticSearch")
      .setMaster("local")
      .set("es.index.auto.create", "true")
      .set("es.nodes", "servidor_elastic_remoto")
      .set("es.port","9200")
      .set("es.net.http.auth.user","admin")
      .set("es.net.http.auth.pass","admin")
      .set("es.net.ssl","true")
      .set("es.net.ssl.cert.allow.self.signed","true")
      .set("es.http.timeout","5m")
      .set("es.net.ssl.truststore.location","truststore.jks")
      .set("es.net.ssl.truststore.pass","changeit")


    val sc = new SparkContext(conf)

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    // Create a RDD
    val rdd = spark.sparkContext.parallelize(
      Seq.fill(10000) {
        (Math.abs(Random.nextLong % 100L),
          Math.abs(Random.nextLong % 1000000000000L))
      })

    import spark.implicits._
    val df = rdd.toDF("duration", "timestamp").cache()

    df.show()

    val mapa = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*))
    sc.makeRDD(mapa).saveToEs("elastic_df/docs")

  }
}

 

Compilar

Generar la claves ssl segura «truststore.jks» siguiendo las instrucciones de Elastic para Searchguard

Incluir la clave generada «truststore.jks» al nivel del fichero App.scala

 

Ejecutar el jar generado

Incluir el fichero «truststore.jks» en el jar a generar

Ejecutar el comando siguiente donde se indica las librerías necesarias que no existan en una maquina con spark estandar:

spark-submit --jars "/tmp/elasticsearch-spark-13_2.11-5.2.2.jar" --class com.scala.App /tmp/elastic_conexion.jar

 

Comprobar datos escritos en Elastic

Mostrar todos los indices almacenados en Elastic

http://localhost:9200/_cat/indices?v

Mostrar datos del índice “elastic_df” en concreto

http://localhost:9200/elastic_df/_search?pretty=true&q=*

Si quisieramos borrar el indice generado, tendriamos que ir a la linea de comandos y ejecutar:

curl --insecure -u usuario:password -XDELETE 'https://localhost:9200/nombre_de_indice'