Conectar con Scala al HDFS de Hadoop

hdfsEscribir datos en HDFS

Ejemplo de como escribir datos RDD en un HDFS de Hadoop.

// Borrar el fichero si es que existe
import scala.sys.process._
"hdfs dfs -rm -r /pruebas" !

// Grabar un RDD en HDFS
val rdd = sc.parallelize(List(
    (0, 60),
    (0, 56),
    (0, 54),
    (0, 62),
    (0, 61),
    (0, 53),
    (0, 55),
    (0, 62),
    (0, 64), 
    (1, 73),
    (1, 78),
    (1, 67),
    (1, 68),
    (1, 78)
))
rdd.saveAsTextFile("hdfs:///pruebas/prueba1.csv")
rdd.collect

Escribir datos en HDFS (2ª forma)

Ejemplo de como escribir datos de texto plano en un HDFS de Hadoop.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

object App {

println( "Prueba de escritura en HDFS..." )
val conf = new Configuration()
val fs= FileSystem.get(conf)
val output = fs.create(new Path("hdfs://sandbox-hdp.hortonworks.com:8020/pruebas/prueba2.txt"))
val writer = new PrintWriter(output)
try {
    writer.write("Hola mundo") 
    writer.write("\n")
}
finally {
    writer.close()
}
print("Finalizado!")
}

Añadir datos a HDFS

Ejemplo de como añadir datos de tipo dataframe a un HDFS

val df = Seq((1, 2), (3, 4), (5,6), (0,0)).toDF("Col_0", "Col_1")
df.show()

df.write.mode("overwrite").format("parquet").save("hdfs:///incrementar_datos.parquet")

df.write.mode("append").format("parquet").save("hdfs:///incrementar_datos.parquet")

val df2 = spark
  .read
  .format("parquet")
  .option("inferSchema", true)
  .load("hdfs:///incrementar_datos.parquet")

df2.show()
+-----+-----+ df
|Col_0|Col_1|
+-----+-----+
|    1|    2|
|    3|    4|
|    5|    6|
|    0|    0|
+-----+-----+

+-----+-----+ df2
|Col_0|Col_1|
+-----+-----+
|    1|    2|
|    3|    4|
|    1|    2|
|    3|    4|
|    5|    6|
|    0|    0|
|    5|    6|
|    0|    0|
+-----+-----+

 

Leer RDDs desde HDFS

Ejemplo simple de como leer datos de un HDFS.

val rdd2 = sc.textFile("hdfs:///pruebas/prueba1.csv")
rdd2.collect()

Nota: sc se refiere a SparkContext, en multitud de entorno de desarrollo big data viene ya instanciado sino deberíamos instanciar el objeto.

 

Leer Dataframes desde HDFS

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

val df: DataFrame = spark
  .read
  .format("csv")
  .option("header", false)
  .option("inferSchema", true)
  .load("hdfs:///pruebas/prueba1.csv")

df.show()

Nota: spark se refiere a SparkSession, en multitud de entorno de desarrollo big data viene ya instanciado sino deberíamos instanciar el objeto.

Otros artículos que pueden ser de interés:

Autor: Diego Calvo