Datos json usados para el análisis
%fs head /databricks-datasets/structured-streaming/events/file-0.json
{“time”:1469501107,”action”:”Open”}
{“time”:1469501147,”action”:”Open”}
{“time”:1469501202,”action”:”Open”}
{“time”:1469501219,”action”:”Open”}
{“time”:1469501225,”action”:”Open”}
{“time”:1469501234,”action”:”Open”}
Fuente: www.batabricks.com
Definir DataFrame especifico de Spark
# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos from pyspark.sql.types import * from pyspark.sql.functions import * # Definir la ruta de donde extraer los ficheros pathText = "/databricks-datasets/structured-streaming/events/" # Definir la estrucutra del formato jsom a leer jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) # DataFrame que representa datos en los archivos JSON df = ( spark .readStream # Cambiar `readStream` en lugar de `read` para leer en Streaming .schema(jsonSchema) # Cargar la estructura json a leer .option("maxFilesPerTrigger", 1) # Tratar los archivo como si fuera una secuencia seleccionando cada vez 1 .json(pathText) ) # Visualizar el DataFrame display(df)
Agrupar un DataFrame por minuto y tipo
# Importar librería para usar la función window from pyspark.sql.functions import *
# Definir el DataFrame agrupado por minuto a partir del original df_by_min = ( df .groupBy( df.action, window(df.time, "1 minute")) .count() )
Establecer la configuración de flujo de datos del cluster
# Fijamos un tamaño de shuffle pequeño spark.conf.set("spark.sql.shuffle.partitions", "2") query = ( df_by_min .writeStream .format("memory") # memory = store in-memory table .queryName("table_streaming") # Poner nombre a la tabla en memoria .outputMode("complete") # complete = todos los contadores deben guardarse en la tabla .start() )
0 comentarios