Modelo de predicción con Spark

por | Jul 6, 2020 | Big data, R, Spark | 0 Comentarios

Spark es un motor ultrarrápido para el almacenamiento, procesamiento y análisis de grandes volúmenes de datos.
Es de código abierto y se encuentra gestionado por la Apache Software Foundation.

Apache Spark está especialmente diseñado para su implementación en Big data y Machine Learning.Pues su potencia de procesamiento agiliza la detección de patrones en los datos, la clasificación organizada de la información, la ejecución de cómputo intensivo sobre los datos y el procesamiento paralelo en clústers.

RStudio ha publicado sparklyr, un nuevo paquete R que ofrece un interfaz entre R y pache Spark.

1. Instalación y carga de librerías

#Instalar paquetes

install.packages("sparklyr")
install.packages("tidyverse")
install.packages("nycflights13")

#Cargar librerias

library(sparklyr)
library(nycflights13)
library(tidyverse)

spark_install("3.0.0")

2. Conexión al entorno Spark

#Nos conectamos al entorno de Spark en local
sesion_spark <- spark_connect(master="local", version="3.0.0")

3. Operaciones con datos

#Guardamos los datos del dataframe en una variable en el entorno Spark
vuelos <- sdf_copy_to(sesion_spark,flights)

#Extraemos el numero de registros que tiene 
vuelos %>%
  tally()

## Source: spark<?> [?? x 1]
##     n
##   <dbl>
## 1 336776

#Extraemos el número de registros que tiene en función al aeropuerto desde donde sale
vuelos %>%
  group_by(origin)%>%
  tally()

## Source: spark<?> [?? x 2]
##  origin      n
##  <chr>   <dbl>
## 1 JFK    111279
## 2 EWR    120835
## 3 LGA    104662

#Obtenemos el tiempo medio en horas de retraso en los vuelos por aeropuerto
vuelos %>%
  group_by(origin)%>%
 summarise(retraso=mean(dep_delay))

## Source: spark<?> [?? x 2]
##  origin retraso
##   <chr>    <dbl>
## 1 JFK       12.1
## 2 EWR       15.1
## 3 LGA       10.3

Extrae si un vuelo ha llegado tarde o no, de manera que todos los que reflejen un numero de horas en la columna arr_delay mayor a 0 será que han llegado tarde

Todo el que refleje un número 1 en la columna tarde será una afirmación de que ha llegado tarde

vuelos %>%
  ft_binarizer(
    input_col = "arr_delay",
    output_col= "tarde",
    threshold= 0
  )%>%
  select(
    arr_delay,
    tarde
  ) #Comprobamos que se realiza de forma correcta

## Source: spark<?> [?? x 2]
##   arr_delay tarde
##       <dbl> <dbl>
## 1        11     1
## 2        20     1
## 3        33     1
## 4       -18     0
## 5       -25     0
## 6        12     1
## 7        19     1
## 8       -14     0
## 9        -8     0
## 10        8     1
## ... with more rows

Ahora vamos a extraer los rangos de las horas en las que llegaron los vuelos

vuelos %>%
  mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col= "hora",
    splits= c(0,800,1600,2400)
  )%>%
  select(
    sched_dep_time,
    hora
  )#Comprobamos que se realiza de forma correcta

## Source: spark<?> [?? x 2]
##   sched_dep_time  hora
##            <dbl> <dbl>
## 1            515     0
## 2            529     0
## 3            540     0
## 4            545     0
## 5            600     0
## 6            558     0
## 7            600     0
## 8            600     0
## 9            600     0
## 10           600     0
## ... with more rows

Ahora Extraemos el número de registros en cada rango de horas en las que llegaron los vuelos

vuelos %>%
  mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col= "hora",
    splits= c(0,800,1600,2400)
  )%>%
  group_by(hora)%>%
  tally()%>%
  arrange(hora)

## Source:     spark<?> [?? x 2]
## Ordered by: hora
##   hora      n
##  <dbl>  <dbl>
## 1     0  50726
## 2     1 164026
## 3     2 122024

El rango 0 sera de 0 a 800, que se traduce como desde 00:00 hasta las 8:00, el rango 1 es de 800 a 1600 y el rango 2 es el que va de 1600 a 2400

#Obtenemos número de vuelos por meses
Datos_meses <- vuelos %>%
  group_by(month) %>%
  tally() %>%
  collect()

Datos_meses

## A tibble: 12 x 2
##  month     n
##  <int> <dbl>
## 1    12 28135
## 2    10 28889
## 3     5 28796
## 4     1 27004
## 5     3 28834
## 6     2 24951
## 7     6 28243
## 8     9 27574
## 9    11 27268
## 10     7 29425
## 11     4 28330
## 12     8 29327

4. Modelización

#Entrenamos un modelo

muestra_vuelos <- vuelos %>%
  filter(!is.na(arr_delay))%>%
  mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
  ft_binarizer(
    input_col = "arr_delay",
    output_col= "tarde",
    threshold= 0
  )%>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col= "hora",
    splits= c(0,800,1600,2400)
  )%>%
  mutate(dephour=paste0("h", as.integer(hora)))%>%
  sdf_random_split(entrenar=0.07, examinar=0.03, otros=0.9)
  
muestra_vuelos$entrenar

## Source: spark<?> [?? x 22]
##    year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight
##   <int> <int> <int>    <int>          <dbl>     <dbl>    <int>          <int>     <dbl> <chr>    <int>
## 1  2013     1     1      558            600        -2      849            851        -2 B6          49
## 2  2013     1     1      613            610         3      925            921         4 B6         135
## 3  2013     1     1      646            645         1     1023           1030        -7 UA        1496
## 4  2013     1     1      656            659        -3      949            959       -10 AA        1815
## 5  2013     1     1      658            700        -2      944            939         5 DL        1547
## 6  2013     1     1      659            705        -6      907            913        -6 DL         831
## 7  2013     1     1      743            749        -6     1043           1054       -11 B6         341
## 8  2013     1     1      804            810        -6     1103           1116       -13 DL        1959
## 9  2013     1     1      810            815        -5     1100           1128       -28 DL        2395
## 10 2013     1     1      828            830        -2     1027           1012        15 B6         905
## ... with more rows, and 11 more variables: tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>,
##   distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>, tarde <dbl>, hora <dbl>, dephour <chr>

Modelizamos con una regresión logística

modelo <- muestra_vuelos$entrenar %>%
  ml_logistic_regression(tarde ~.)#Predecimos si llegará tarde un vuelo en función del resto de variables

Seleccionamos las columnas que más nos interesan para nuestro modelo de entrenamiento

entrenar <- muestra_vuelos$entrenar %>%
  mutate(
    arr_delay= ifelse(arr_delay=="NaN",0,arr_delay)
  )%>%
  select(
    month,
    sched_dep_time,
    arr_delay,
    distance
  )%>%
  mutate_all(as.numeric)

Creamos un Pipeline desarrollando los parámetros que vamos a utilizar en nuestro modelo


Modelo_ML_vuelos <- ml_pipeline(sesion_spark) %>%
  ft_dplyr_transformer(
    tbl=entrenar
  ) %>%
  ft_binarizer(
     input_col = "arr_delay",
    output_col= "tarde",
    threshold= 0
  )%>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col= "hora",
    splits= c(0,800,1600,2400)
  )%>%
  ft_r_formula(tarde ~ hora + distance + arr_delay)%>%
  ml_logistic_regression()

Modelo_ML_vuelos

## Pipeline (Estimator) with 5 stages
## <pipeline_428c5e382011> 
##  Stages 
  |--1 SQLTransformer (Transformer)
  |    <dplyr_transformer_428c8fa56a0> 
  |     (Parameters -- Column Names)
  |--2 Binarizer (Transformer)
  |    <binarizer_428c792442a7> 
  |     (Parameters -- Column Names)
  |      input_col: arr_delay
  |      output_col: tarde
  |--3 Bucketizer (Transformer)
  |    <bucketizer_428c5d3e6a14> 
  |     (Parameters -- Column Names)
  |      input_col: sched_dep_time
  |      output_col: hora
  |--4 RFormula (Estimator)
  |    <r_formula_428c366f683f> 
  |     (Parameters -- Column Names)
  |      features_col: features
  |      label_col: label
  |     (Parameters)
  |      force_index_label: FALSE
  |      formula: tarde ~ hora + distance + arr_delay
  |      handle_invalid: error
  |      stringIndexerOrderType: frequencyDesc
  |--5 LogisticRegression (Estimator)
  |    <logistic_regression_428c7fff6838> 
  |     (Parameters -- Column Names)
  |      features_col: features
  |      label_col: label
  |      prediction_col: prediction
  |      probability_col: probability
  |      raw_prediction_col: rawPrediction
  |     (Parameters)
  |      aggregation_depth: 2
  |      elastic_net_param: 0
  |      family: auto
  |      fit_intercept: TRUE
  |      max_iter: 100
  |      reg_param: 0
  |      standardization: TRUE
  |      threshold: 0.5
  |      tol: 1e-06

Ajustamos el modelo a los datos

Modelo_fit_vuelos <- ml_fit(Modelo_ML_vuelos,muestra_vuelos$entrenar)
Modelo_fit_vuelos

## PipelineModel (Transformer) with 5 stages
## <pipeline_428c5e382011> 
##  Stages 
  |--1 SQLTransformer (Transformer)
  |    <dplyr_transformer_428c8fa56a0> 
  |     (Parameters -- Column Names)
  |--2 Binarizer (Transformer)
  |    <binarizer_428c792442a7> 
  |     (Parameters -- Column Names)
  |      input_col: arr_delay
  |      output_col: tarde
  |--3 Bucketizer (Transformer)
  |    <bucketizer_428c5d3e6a14> 
  |     (Parameters -- Column Names)
  |      input_col: sched_dep_time
  |      output_col: hora
  |--4 RFormulaModel (Transformer)
  |    <r_formula_428c366f683f> 
  |     (Parameters -- Column Names)
  |      features_col: features
  |      label_col: label
  |     (Transformer Info)
  |      formula:  chr "tarde ~ hora + distance + arr_delay" 
  |--5 LogisticRegressionModel (Transformer)
  |    <logistic_regression_428c7fff6838> 
  |     (Parameters -- Column Names)
  |      features_col: features
  |      label_col: label
  |      prediction_col: prediction
  |      probability_col: probability
  |      raw_prediction_col: rawPrediction
  |     (Transformer Info)
  |      coefficient_matrix:  num [1, 1:3] -0.5802 -0.0003 28.7768 
  |      coefficients:  num [1:3] -0.5802 -0.0003 28.7768 
  |      intercept:  num -13.1 
  |      intercept_vector:  num -13.1 
  |      num_classes:  int 2 
  |      num_features:  int 3 
  |      threshold:  num 0.5 
  |      thresholds:  num [1:2] 0.5 0.5 

5. Predicciones

#Hacemos las predicciones

prediccion <- ml_transform(Modelo_fit_vuelos,muestra_vuelos$examinar)
prediccion

## Source: spark<?> [?? x 11]
##   month sched_dep_time arr_delay distance tarde  hora features label rawPrediction probability prediction
##   <dbl>          <dbl>     <dbl>    <dbl> <dbl> <dbl> <list>   <dbl> <list>        <list>           <dbl>
## 1     1            545       -18     1576     0     0 <dbl [3~     0 <dbl [2]>     <dbl [2]>            0
## 2     1            615        -9      746     0     0 <dbl [3~     0 <dbl [2]>     <dbl [2]>            0
## 3     1            730        31     1389     1     0 <dbl [3~     1 <dbl [2]>     <dbl [2]>            1
## 4     1            810        11     1029     1     1 <dbl [3~     1 <dbl [2]>     <dbl [2]>            1
## 5     1            830         2      888     1     1 <dbl [3~     1 <dbl [2]>     <dbl [2]>            1
## 6     1            851       -25     2402     0     1 <dbl [3~     0 <dbl [2]>     <dbl [2]>            0
## 7     1            733       123      200     1     0 <dbl [3~     1 <dbl [2]>     <dbl [2]>            1
## 8     1           1155         8      397     1     1 <dbl [3~     1 <dbl [2]>     <dbl [2]>            1
## 9     1           1200        -3      213     0     1 <dbl [3~     0 <dbl [2]>     <dbl [2]>            0
## 10    1           1159        -8     1608     0     1 <dbl [3~     0 <dbl [2]>     <dbl [2]>            0

Se puede observar como las predicciones realizadas coinciden con la realidad, es decir, que la columna tarde coincide con la columna prediction, esto quiere decir que nuestro modelo de predicción tiene un alto porcentaje de acertar.

Vamos a ver el resultado total de predicción con una matriz de confusión

#Creamos la matriz de confusión 
prediccion%>%
  group_by(tarde,prediction)%>%
  tally()

## Source: spark<?> [?? x 3]
## Groups: tarde
## tarde prediction     n
##  <dbl>      <dbl> <dbl>
## 1    1          1  3973
## 2    0          0  5744

0 comentarios

Enviar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *