Regresión Lineal sobre segmentos de datos en Scala

Ejemplo de regresión lineal sobre segmentos de datos

Realiza diferentes regresiones de datos tomando como grupo para segmentar la clave formado por el identificador y el tipo

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, StringType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object App{


  def lineal_regresion (df: DataFrame) = {

    // Definir características
    val features = new VectorAssembler()
      .setInputCols(Array("timestamp"))
      .setOutputCol("features")

    // Definir modelo a utilizar
    val lr = new LinearRegression().setLabelCol("duration")

    // Crear una tuberia que asocie el modelo con la secuencia de tratamiento de datos
    val pipeline = new Pipeline().setStages(Array(features, lr))

    //Ejecutar el modelo
    val model = pipeline.fit(df)

    //Mostrar resultados del modelo
    val linRegModel = model.stages(1).asInstanceOf[LinearRegressionModel]

    linRegModel.coefficients(0) :: linRegModel.intercept :: Nil
  }


  def calculation(id:Row, main_df: DataFrame)= List{

    var df = main_df.filter(col("id") === id(0) && col("type") === id(1) )

    val regresion = lineal_regresion(df)

    (df.agg(max(df("duration")),
      avg(df("duration")),
      variance(df("duration")),
      min(df("timestamp")),
      max(df("timestamp"))).head()

    , regresion(0), regresion(1), id(0), id(1))
  }

  def main(args: Array[String])
  {

    val conf = new SparkConf()
      .setAppName("DiegoWeb")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")

    val sc = new SparkContext(conf)

    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    val data = List(
      Row("1","A",100,1541113010),
      Row("1","A",200,1541123010),
      Row("1","B",300,1541133010),
      Row("2","A",400,1541143010),
      Row("2","A",500,1541153010),
      Row("2","B",600,1541163010)
    )

    val rdd = sc.parallelize(data)
    val schema = StructType(
      List(
        StructField("id", StringType),
        StructField("type", StringType),
        StructField("duration", IntegerType),
        StructField("timestamp", IntegerType)
      )
    )

    val df = sparkSession.createDataFrame(rdd,schema)

    df.show()

    val regression = lineal_regresion(df)

    println(regression(0))
    println(regression(1))

    val ids = df.select(df("id"),df("type")).distinct.rdd.collect().toList

    val x = ids.flatMap( ids => calculation(ids, df))
    println(x)
  }
}

List(([200,150.0,5000.0,1541113010,1541123010],0.00999966655456395,-1.5410516221233152E7,1,A), 
([600,600.0,NaN,1541163010,1541163010],0.0,600.0,2,B), 
([300,300.0,NaN,1541133010,1541133010],0.0,300.0,1,B), 
([500,450.0,5000.0,1541143010,1541153010],0.010000331975887176,-1.5411541723977894E7,2,A))

 

Configuración del fichero SBT

name := "regression"
version := "0.1"
scalaVersion := "2.10.6"
mainClass := Some("com.example.Main_Example")
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.1"