Cálculo de media móvil en Dataframe Scala
Calcula la media móvil a partir de una ventana temporal de 3 periodos
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df = sc.parallelize(
List(("Sensor1", "2016-05-01", 50.00),
("Sensor1", "2016-05-03", 45.00),
("Sensor1", "2016-05-04", 55.00),
("Sensor2", "2016-05-01", 25.00),
("Sensor2", "2016-05-04", 29.00),
("Sensor2", "2016-05-06", 27.00))
).toDF("sensor", "fecha", "valor")
// Crea una ventana tempora de 3 periodos sobre la que realizar calculos
val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(-1, 1)
// Calcula la media movil
df.withColumn("media_movil", avg(df("valor")).over(window) ).show()+-------+----------+-----+-----------+ | sensor| fecha|valor|media_movil| +-------+----------+-----+-----------+ |Sensor2|2016-05-01| 25.0| 27.0| |Sensor2|2016-05-04| 29.0| 27.0| |Sensor2|2016-05-06| 27.0| 28.0| |Sensor1|2016-05-01| 50.0| 47.5| |Sensor1|2016-05-03| 45.0| 50.0| |Sensor1|2016-05-04| 55.0| 50.0| +-------+----------+-----+-----------+
Suma acumulativa en Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(Long.MinValue, 0)
df.withColumn( "suma_acumulada", sum(df("valor")).over(window)).show()
+-------+----------+-----+--------------+ | sensor| fecha|valor|suma_acumulada| +-------+----------+-----+--------------+ |Sensor2|2016-05-01| 25.0| 25.0| |Sensor2|2016-05-04| 29.0| 54.0| |Sensor2|2016-05-06| 27.0| 81.0| |Sensor1|2016-05-01| 50.0| 50.0| |Sensor1|2016-05-03| 45.0| 95.0| |Sensor1|2016-05-04| 55.0| 150.0| +-------+----------+-----+--------------+
Valor previo en un Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha")
df.withColumn("valor_previo", lag(df("valor"), 1).over(window) ).show()
+-------+----------+-----+------------+ | sensor| fecha|valor|valor_previo| +-------+----------+-----+------------+ |Sensor2|2016-05-01| 25.0| null| |Sensor2|2016-05-04| 29.0| 25.0| |Sensor2|2016-05-06| 27.0| 29.0| |Sensor1|2016-05-01| 50.0| null| |Sensor1|2016-05-03| 45.0| 50.0| |Sensor1|2016-05-04| 55.0| 45.0| +-------+----------+-----+------------+
Ranking en Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha")
df.withColumn("ranking", rank().over(window) ).show()
+-------+----------+-----+-------+ | sensor| fecha|valor|ranking| +-------+----------+-----+-------+ |Sensor2|2016-05-01| 25.0| 1| |Sensor2|2016-05-04| 29.0| 2| |Sensor2|2016-05-06| 27.0| 3| |Sensor1|2016-05-01| 50.0| 1| |Sensor1|2016-05-03| 45.0| 2| |Sensor1|2016-05-04| 55.0| 3| +-------+----------+-----+-------+





0 comentarios