Use of pipelines in Apache Spark in Python

Example of pipeline concatenationlogo python

In this example, you can show an example of how elements are included in a pipe in such a way that finally all converge in the same point, which we call “features”

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

# Define the Spark DF to use
df = spark.createDataFrame([
    ('line_1', 1, 2, 3, 4),
    ('line_2', 5, 6, 7, 8),
    ('line_3', 9, 9, 9, 9)
], ("label", "x1", "x2", "x3", "x4"))

# Define an assembler of the columns 'x1' and 'x2' and take as output 'features1'
assembler12 = VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
# Create the pipeline
pipeline12 = Pipeline()
# Define the stages that the pipeline is made of
pipeline12.setStages([assembler12])

# Define an assembler of the columns 'x3' and 'x4' and take as output 'features2'
assembler34 = VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
# Create the pipeline
pipeline34 = Pipeline()
# Define the stages that the pipe is made of
pipeline34.setStages([assembler34])

# Define an assembler of the columns 'features1' and 'features2' and take as output 'features'
assemblerResult = VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
# Create the pipe
pipelineResult = Pipeline()
# Define the stages that the pipe is made of
pipelineResult.setStages([pipeline12, pipeline34, assemblerResult])

# Model of piping adjustment with data 'df' input
modelResult = pipelineResult.fit(df)
# Make data transformation using the model
result_df = modelResult.transform(df)
# shows the results
display(result_df)

Pipeline result Python Spark example

Example of pipelines use

# Create the pipeline
pipelineResult = Pipeline()
# Define the stages that the pipe is made of
pipelineResult.setStages([assembler12, assembler34, assemblerResult])

# Model of piping adjustment with data ' df ' input
modelResult = pipelineResult.fit(df)
# Make data transformation using the model
result_df2 = modelResult.transform(df)
# Shows the results
display(result_df2)

Pipeline result Python Spark example

Example of pipe concatenation 2 (optimizing sentences)

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    ('line_1', 1, 2, 3, 4),
    ('line_2', 5, 6, 7, 8),
    ('line_3', 9, 9, 9, 9)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

result = Pipeline(stages=[
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)

display(result)

Pipeline result Python Spark example