Procesamiento por lotes (batch) Apache Spark en Python

Datos json usados para el análisis

%fs head /databricks-datasets/structured-streaming/events/file-0.json

{“time”:1469501107,”action”:”Open”}
{“time”:1469501147,”action”:”Open”}
{“time”:1469501202,”action”:”Open”}
{“time”:1469501219,”action”:”Open”}
{“time”:1469501225,”action”:”Open”}
{“time”:1469501234,”action”:”Open”}

Fuente: www.batabricks.com

 

Definir DataFrame especifico de Spark

# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos
from pyspark.sql.types import *

pathText = "/databricks-datasets/structured-streaming/events/"
jsonSchema = StructType([ 
                 StructField("time", TimestampType(), True), 
                 StructField("action", StringType(), True) 
             ])

# DataFrame que representa datos en los archivos JSON
df = (
  spark
    .read
    .schema(jsonSchema)
    .json(pathText)
)

# Visualizar el DataFrame
display(df)
time action
2016-07-28T04:19:28.000+0000 Close
2016-07-28T04:19:28.000+0000 Close
2016-07-28T04:19:29.000+0000 Open
2016-07-28T04:19:31.000+0000 Close

 

Agrupar un DataFrame por minuto y tipo

# Importar librería para usar la función window
from pyspark.sql.functions import *

# Definir el DataFrame agrupado por minuto a partir del original
df_by_min2 = (
  df
    .groupBy(
       df.action, 
       window(df.time, "1 minute"))   # 1 hour ...   
    .count()
)
# Visualizar el DataFrame
display(df_by_min2)


Realizar consultas SQL

# Almacenar el DataFrame como una tabla de SparkQL
df.createOrReplaceTempView("view_df")

# Realizar la consulta
result = spark.sql("SELECT action, COUNT(time) as total FROM view_df group by action")

# Visualizar el resultado de la consulta SQL
result.show()

+------+-----+ 
|action|total| 
+------+-----+ 
|  Open|50000| 
| Close|50000| 
+------+-----+

Otros artículos que pueden ser de interés:

Autor: Diego Calvo