Cálculos sobre ventana temporal en Dataframe Scala

Cálculo de media móvil en Dataframe Scalalogo scala

Calcula la media móvil a partir de una ventana temporal de 3 periodos

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = sc.parallelize(
   List(("Sensor1", "2016-05-01", 50.00),
        ("Sensor1", "2016-05-03", 45.00),
        ("Sensor1", "2016-05-04", 55.00),
        ("Sensor2", "2016-05-01", 25.00),
        ("Sensor2", "2016-05-04", 29.00),
        ("Sensor2", "2016-05-06", 27.00))
    ).toDF("sensor", "fecha", "valor")

// Crea una ventana tempora de 3 periodos sobre la que realizar calculos
val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(-1, 1)


// Calcula la media movil
df.withColumn("media_movil", avg(df("valor")).over(window) ).show()
+-------+----------+-----+-----------+
| sensor|     fecha|valor|media_movil|
+-------+----------+-----+-----------+
|Sensor2|2016-05-01| 25.0|       27.0|
|Sensor2|2016-05-04| 29.0|       27.0|
|Sensor2|2016-05-06| 27.0|       28.0|
|Sensor1|2016-05-01| 50.0|       47.5|
|Sensor1|2016-05-03| 45.0|       50.0|
|Sensor1|2016-05-04| 55.0|       50.0|
+-------+----------+-----+-----------+

Suma acumulativa en Dataframe Scala

Calcula una suma acumulativa usando separador de ventana el identificador del sensor

val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(Long.MinValue, 0)
df.withColumn( "suma_acumulada", sum(df("valor")).over(window)).show()
+-------+----------+-----+--------------+
| sensor|     fecha|valor|suma_acumulada|
+-------+----------+-----+--------------+
|Sensor2|2016-05-01| 25.0|          25.0|
|Sensor2|2016-05-04| 29.0|          54.0|
|Sensor2|2016-05-06| 27.0|          81.0|
|Sensor1|2016-05-01| 50.0|          50.0|
|Sensor1|2016-05-03| 45.0|          95.0|
|Sensor1|2016-05-04| 55.0|         150.0|
+-------+----------+-----+--------------+

 

Valor previo en un Dataframe Scala

Calcula una suma acumulativa usando separador de ventana el identificador del sensor

val window = Window.partitionBy("sensor").orderBy("fecha")
df.withColumn("valor_previo", lag(df("valor"), 1).over(window) ).show()
+-------+----------+-----+------------+
| sensor|     fecha|valor|valor_previo|
+-------+----------+-----+------------+
|Sensor2|2016-05-01| 25.0|        null|
|Sensor2|2016-05-04| 29.0|        25.0|
|Sensor2|2016-05-06| 27.0|        29.0|
|Sensor1|2016-05-01| 50.0|        null|
|Sensor1|2016-05-03| 45.0|        50.0|
|Sensor1|2016-05-04| 55.0|        45.0|
+-------+----------+-----+------------+

 

Ranking en Dataframe Scala

Calcula una suma acumulativa usando separador de ventana el identificador del sensor

val window = Window.partitionBy("sensor").orderBy("fecha")
df.withColumn("ranking", rank().over(window) ).show()
+-------+----------+-----+-------+
| sensor|     fecha|valor|ranking|
+-------+----------+-----+-------+
|Sensor2|2016-05-01| 25.0|      1|
|Sensor2|2016-05-04| 29.0|      2|
|Sensor2|2016-05-06| 27.0|      3|
|Sensor1|2016-05-01| 50.0|      1|
|Sensor1|2016-05-03| 45.0|      2|
|Sensor1|2016-05-04| 55.0|      3|
+-------+----------+-----+-------+

 

Otros artículos que pueden ser de interés:

Autor: Diego Calvo