Escribir datos en HDFS
Ejemplo de como escribir datos RDD en un HDFS de Hadoop.
// Borrar el fichero si es que existe
import scala.sys.process._
"hdfs dfs -rm -r /pruebas" !
// Grabar un RDD en HDFS
val rdd = sc.parallelize(List(
(0, 60),
(0, 56),
(0, 54),
(0, 62),
(0, 61),
(0, 53),
(0, 55),
(0, 62),
(0, 64),
(1, 73),
(1, 78),
(1, 67),
(1, 68),
(1, 78)
))
rdd.saveAsTextFile("hdfs:///pruebas/prueba1.csv")
rdd.collect
Escribir datos en HDFS (2ª forma)
Ejemplo de como escribir datos de texto plano en un HDFS de Hadoop.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;
object App {
println( "Prueba de escritura en HDFS..." )
val conf = new Configuration()
val fs= FileSystem.get(conf)
val output = fs.create(new Path("hdfs://sandbox-hdp.hortonworks.com:8020/pruebas/prueba2.txt"))
val writer = new PrintWriter(output)
try {
writer.write("Hola mundo")
writer.write("\n")
}
finally {
writer.close()
}
print("Finalizado!")
}
Añadir datos a HDFS
Ejemplo de como añadir datos de tipo dataframe a un HDFS
val df = Seq((1, 2), (3, 4), (5,6), (0,0)).toDF("Col_0", "Col_1")
df.show()
df.write.mode("overwrite").format("parquet").save("hdfs:///incrementar_datos.parquet")
df.write.mode("append").format("parquet").save("hdfs:///incrementar_datos.parquet")
val df2 = spark
.read
.format("parquet")
.option("inferSchema", true)
.load("hdfs:///incrementar_datos.parquet")
df2.show()
+-----+-----+ df |Col_0|Col_1| +-----+-----+ | 1| 2| | 3| 4| | 5| 6| | 0| 0| +-----+-----+ +-----+-----+ df2 |Col_0|Col_1| +-----+-----+ | 1| 2| | 3| 4| | 1| 2| | 3| 4| | 5| 6| | 0| 0| | 5| 6| | 0| 0| +-----+-----+
Leer RDDs desde HDFS
Ejemplo simple de como leer datos de un HDFS.
val rdd2 = sc.textFile("hdfs:///pruebas/prueba1.csv")
rdd2.collect()Nota: sc se refiere a SparkContext, en multitud de entorno de desarrollo big data viene ya instanciado sino deberíamos instanciar el objeto.
Leer Dataframes desde HDFS
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
val df: DataFrame = spark
.read
.format("csv")
.option("header", false)
.option("inferSchema", true)
.load("hdfs:///pruebas/prueba1.csv")
df.show()Nota: spark se refiere a SparkSession, en multitud de entorno de desarrollo big data viene ya instanciado sino deberíamos instanciar el objeto.




0 comentarios