Conectar con Scala al HDFS de Hadoop

Escribir datos en HDFS

// Borrar el fichero si es que existe
import scala.sys.process._
"hdfs dfs -rm -r /pruebas" !

// Grabar un RDD en HDFS
val rdd = sc.parallelize(List(
    (0, 60),
    (0, 56),
    (0, 54),
    (0, 62),
    (0, 61),
    (0, 53),
    (0, 55),
    (0, 62),
    (0, 64), 
    (1, 73),
    (1, 78),
    (1, 67),
    (1, 68),
    (1, 78)
))
rdd.saveAsTextFile("hdfs:///pruebas/prueba1.csv")
rdd.collect

Escribir datos en HDFS (2ª forma)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

object App {

println( "Prueba de escritura en HDFS..." )
val conf = new Configuration()
val fs= FileSystem.get(conf)
val output = fs.create(new Path("hdfs://sandbox-hdp.hortonworks.com:8020/pruebas/prueba2.txt"))
val writer = new PrintWriter(output)
try {
    writer.write("Hola mundo") 
    writer.write("\n")
}
finally {
    writer.close()
}
print("Finalizado!")
}

 

Leer RDDs desde HDFS

val rdd2 = sc.textFile("hdfs:///pruebas/prueba1.csv")
rdd2.collect()

 

Leer Dataframes desde HDFS

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

val df: DataFrame = spark
  .read
  .format("csv")
  .option("header", false)
  .option("inferSchema", true)
  .load("hdfs:///pruebas/prueba1.csv")

df.show()

Autor: Diego Calvo