Example of pipeline concatenation
In this example, you can show an example of how elements are included in a pipe in such a way that finally all converge in the same point, which we call “features”
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
# Define the Spark DF to use
df = spark.createDataFrame([
('line_1', 1, 2, 3, 4),
('line_2', 5, 6, 7, 8),
('line_3', 9, 9, 9, 9)
], ("label", "x1", "x2", "x3", "x4"))
# Define an assembler of the columns 'x1' and 'x2' and take as output 'features1'
assembler12 = VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
# Create the pipeline
pipeline12 = Pipeline()
# Define the stages that the pipeline is made of
pipeline12.setStages([assembler12])
# Define an assembler of the columns 'x3' and 'x4' and take as output 'features2'
assembler34 = VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
# Create the pipeline
pipeline34 = Pipeline()
# Define the stages that the pipe is made of
pipeline34.setStages([assembler34])
# Define an assembler of the columns 'features1' and 'features2' and take as output 'features'
assemblerResult = VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
# Create the pipe
pipelineResult = Pipeline()
# Define the stages that the pipe is made of
pipelineResult.setStages([pipeline12, pipeline34, assemblerResult])
# Model of piping adjustment with data 'df' input
modelResult = pipelineResult.fit(df)
# Make data transformation using the model
result_df = modelResult.transform(df)
# shows the results
display(result_df)
Example of pipelines use
# Create the pipeline pipelineResult = Pipeline() # Define the stages that the pipe is made of pipelineResult.setStages([assembler12, assembler34, assemblerResult]) # Model of piping adjustment with data ' df ' input modelResult = pipelineResult.fit(df) # Make data transformation using the model result_df2 = modelResult.transform(df) # Shows the results display(result_df2)
Example of pipe concatenation 2 (optimizing sentences)
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
('line_1', 1, 2, 3, 4),
('line_2', 5, 6, 7, 8),
('line_3', 9, 9, 9, 9)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
result = Pipeline(stages=[
pipeline1, pipeline2,
VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
display(result)







0 Comments