Prerequisitos
Limpiar todos los hdfs generados, utilizando la linea de comandos
hdfs dfs -rm -r /streaming hdfs dfs -mkdir /streaming hdfs dfs -ls /streaming
Generar datos usados para iniciar el análisis
Se genera una serie de datos en formato json que se almacenan en un fichero hdfs.
val events = sc.parallelize( """ [{"accion":"Abrir", "tiempo":"2018-08-01T00:01:17Z"}, {"accion":"Cerrar","tiempo":"2018-08-01T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-02T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-02T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-03T00:01:17Z"}, {"accion":"Cerrar","tiempo":"2018-08-03T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-03T00:01:17Z"}] """ :: Nil) val df = sqlContext.read.json(events) df.write.mode("overwrite").format("json").save("/streaming/events_diego1.json")
Definir la lectura en tiempo real
import org.apache.spark.sql.functions._ val inputPath = "/streaming/*.json" val jsonSchema = new StructType().add("accion", StringType).add("tiempo", TimestampType) val streamingInputDF = spark .readStream // Usado para lecturas en tiempo real .schema(jsonSchema) // Definir el esquema json a utilizar .option("maxFilesPerTrigger", 1) // Tratar las secuencias de archivos seleccionando una por vez .json(inputPath
Definir la consulta
// Definir la consulta como si fuera estática val streamingCountsDF = streamingInputDF .groupBy($"accion", window($"tiempo", "1 day")) .count() // Comprobar que la estructura de datos es de tiempo real streamingCountsDF.isStreaming
Establecer la configuración de flujo de datos del cluster
spark.conf.set("spark.sql.shuffle.partitions", "1") // mantener el tamaño de las mezclas pequeñas
val query = streamingCountsDF .writeStream .format("memory") // almacena la tabla de memoria (en Spark 2.0) .queryName("mi_tabla") // nombre de la tabla generada .outputMode("complete") //todos los recuentos deben de estar en la tabla .start()
Visualizar los datos almacenados hasta el momento
%sql select accion, window.end as tiempo, count from mi_tabla order by tiempo, accion
Generar nuevos datos a incluir en el análisis
[{"accion":"Abrir", "tiempo":"2018-08-04T00:01:17Z"}, {"accion":"Cerrar","tiempo":"2018-08-04T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-04T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-05T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-05T00:01:17Z"}, {"accion":"Cerrar","tiempo":"2018-08-06T00:01:17Z"}, {"accion":"Abrir", "tiempo":"2018-08-06T00:01:17Z"}] """ :: Nil) val df = sqlContext.read.json(events) df.write.mode("overwrite").format("json").save("/streaming/events_diego2.json")
Visualizar los datos acumulados
%sql select accion, window.end as tiempo, count from mi_tabla order by tiempo, accion
0 comentarios