Datos json usados para el análisis
%fs head /databricks-datasets/structured-streaming/events/file-0.json
{“time”:1469501107,”action”:”Open”}
{“time”:1469501147,”action”:”Open”}
{“time”:1469501202,”action”:”Open”}
{“time”:1469501219,”action”:”Open”}
{“time”:1469501225,”action”:”Open”}
{“time”:1469501234,”action”:”Open”}
Fuente: www.batabricks.com
Definir DataFrame especifico de Spark
# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Definir la ruta de donde extraer los ficheros
pathText = "/databricks-datasets/structured-streaming/events/"
# Definir la estrucutra del formato jsom a leer
jsonSchema = StructType([
StructField("time", TimestampType(), True),
StructField("action", StringType(), True)
])
# DataFrame que representa datos en los archivos JSON
df = (
spark
.readStream # Cambiar `readStream` en lugar de `read` para leer en Streaming
.schema(jsonSchema) # Cargar la estructura json a leer
.option("maxFilesPerTrigger", 1) # Tratar los archivo como si fuera una secuencia seleccionando cada vez 1
.json(pathText)
)
# Visualizar el DataFrame
display(df)
Agrupar un DataFrame por minuto y tipo
# Importar librería para usar la función window from pyspark.sql.functions import *
# Definir el DataFrame agrupado por minuto a partir del original df_by_min = ( df .groupBy( df.action, window(df.time, "1 minute")) .count() )
Establecer la configuración de flujo de datos del cluster
# Fijamos un tamaño de shuffle pequeño
spark.conf.set("spark.sql.shuffle.partitions", "2")
query = (
df_by_min
.writeStream
.format("memory") # memory = store in-memory table
.queryName("table_streaming") # Poner nombre a la tabla en memoria
.outputMode("complete") # complete = todos los contadores deben guardarse en la tabla
.start()
)



0 comentarios