Procesamiento en tiempo real (streaming) Apache Spark en Python

Datos json usados para el análisis

%fs head /databricks-datasets/structured-streaming/events/file-0.json

{“time”:1469501107,”action”:”Open”}
{“time”:1469501147,”action”:”Open”}
{“time”:1469501202,”action”:”Open”}
{“time”:1469501219,”action”:”Open”}
{“time”:1469501225,”action”:”Open”}
{“time”:1469501234,”action”:”Open”}

Fuente: www.batabricks.com

 

Definir DataFrame especifico de Spark

# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Definir la ruta de donde extraer los ficheros
pathText = "/databricks-datasets/structured-streaming/events/"

# Definir la estrucutra del formato jsom a leer
jsonSchema = StructType([ 
                 StructField("time", TimestampType(), True), 
                 StructField("action", StringType(), True) 
             ])

# DataFrame que representa datos en los archivos JSON
df = (
  spark
    .readStream  # Cambiar `readStream` en lugar de `read` para leer en Streaming                    
    .schema(jsonSchema)   # Cargar la estructura json a leer            
    .option("maxFilesPerTrigger", 1)  # Tratar los archivo como si fuera una secuencia seleccionando cada vez 1
    .json(pathText)
)

# Visualizar el DataFrame
display(df)

Agrupar un DataFrame por minuto y tipo

# Importar librería para usar la función window
from pyspark.sql.functions import *

# Definir el DataFrame agrupado por minuto a partir del original
df_by_min = (                 
  df
    .groupBy(
      df.action, 
      window(df.time, "1 minute"))
    .count()
)

Establecer la configuración de flujo de datos del cluster

# Fijamos un tamaño de shuffle pequeño
spark.conf.set("spark.sql.shuffle.partitions", "2")  

query = (
  df_by_min
    .writeStream
    .format("memory") # memory = store in-memory table
    .queryName("table_streaming")     # Poner nombre a la tabla en memoria
    .outputMode("complete")  # complete = todos los contadores deben guardarse en la tabla
    .start()
)