Regresión Lineal sobre segmentos de datos en Scala

por | Nov 23, 2018 | Big data, Scala | 0 Comentarios

Ejemplo de regresión lineal sobre segmentos de datos

Realiza diferentes regresiones de datos tomando como grupo para segmentar la clave formado por el identificador y el tipo

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, StringType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object App{


  def lineal_regresion (df: DataFrame) = {

    // Definir características
    val features = new VectorAssembler()
      .setInputCols(Array("timestamp"))
      .setOutputCol("features")

    // Definir modelo a utilizar
    val lr = new LinearRegression().setLabelCol("duration")

    // Crear una tuberia que asocie el modelo con la secuencia de tratamiento de datos
    val pipeline = new Pipeline().setStages(Array(features, lr))

    //Ejecutar el modelo
    val model = pipeline.fit(df)

    //Mostrar resultados del modelo
    val linRegModel = model.stages(1).asInstanceOf[LinearRegressionModel]

    linRegModel.coefficients(0) :: linRegModel.intercept :: Nil
  }


  def calculation(id:Row, main_df: DataFrame)= List{

    var df = main_df.filter(col("id") === id(0) && col("type") === id(1) )

    val regresion = lineal_regresion(df)

    (df.agg(max(df("duration")),
      avg(df("duration")),
      variance(df("duration")),
      min(df("timestamp")),
      max(df("timestamp"))).head()

    , regresion(0), regresion(1), id(0), id(1))
  }

  def main(args: Array[String])
  {

    val conf = new SparkConf()
      .setAppName("DiegoWeb")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")

    val sc = new SparkContext(conf)

    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    val data = List(
      Row("1","A",100,1541113010),
      Row("1","A",200,1541123010),
      Row("1","B",300,1541133010),
      Row("2","A",400,1541143010),
      Row("2","A",500,1541153010),
      Row("2","B",600,1541163010)
    )

    val rdd = sc.parallelize(data)
    val schema = StructType(
      List(
        StructField("id", StringType),
        StructField("type", StringType),
        StructField("duration", IntegerType),
        StructField("timestamp", IntegerType)
      )
    )

    val df = sparkSession.createDataFrame(rdd,schema)

    df.show()

    val regression = lineal_regresion(df)

    println(regression(0))
    println(regression(1))

    val ids = df.select(df("id"),df("type")).distinct.rdd.collect().toList

    val x = ids.flatMap( ids => calculation(ids, df))
    println(x)
  }
}

List(([200,150.0,5000.0,1541113010,1541123010],0.00999966655456395,-1.5410516221233152E7,1,A), 
([600,600.0,NaN,1541163010,1541163010],0.0,600.0,2,B), 
([300,300.0,NaN,1541133010,1541133010],0.0,300.0,1,B), 
([500,450.0,5000.0,1541143010,1541153010],0.010000331975887176,-1.5411541723977894E7,2,A))

 

Configuración del fichero SBT

name := "regression"
version := "0.1"
scalaVersion := "2.10.6"
mainClass := Some("com.example.Main_Example")
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.1"

0 comentarios

Enviar un comentario

Tu dirección de correo electrónico no será publicada.