Spark MLlib Machine Learning

MLlib is Spark's built-in machine learning library. It trains models on datasets spread across hundreds of machines, making it possible to work with training data that is too large for a single computer. MLlib includes algorithms for classification, regression, clustering, recommendation, and more.

ML Pipeline — The Assembly Line for Models

MLlib organizes machine learning workflows into Pipelines. A Pipeline chains data preparation steps and the model algorithm into one object. This keeps code clean and makes it easy to apply the same pipeline to new data.

Raw Data
   |
   v
[Stage 1: StringIndexer]  -- converts category text to numbers
   |
   v
[Stage 2: VectorAssembler] -- combines all features into one column
   |
   v
[Stage 3: StandardScaler]  -- scales numbers to similar ranges
   |
   v
[Stage 4: RandomForest]    -- the model algorithm
   |
   v
Predictions

Classification Example — Predicting Loan Default

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Sample data: each row is a loan application
# label = 1 (defaulted) or 0 (paid on time)
data = spark.read.csv("loans.csv", header=True, inferSchema=True)
data.printSchema()
# root
#  |-- age: integer
#  |-- income: double
#  |-- loan_amount: double
#  |-- credit_score: integer
#  |-- employment: string
#  |-- label: integer  (1=default, 0=no default)

# Step 1: Convert 'employment' text to a number
indexer = StringIndexer(inputCol="employment", outputCol="employment_idx")

# Step 2: Combine all numeric features into a single "features" vector
assembler = VectorAssembler(
    inputCols=["age", "income", "loan_amount", "credit_score", "employment_idx"],
    outputCol="features"
)

# Step 3: Define the model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Step 4: Build the pipeline
pipeline = Pipeline(stages=[indexer, assembler, rf])

# Step 5: Split data into training (80%) and test (20%)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 6: Train the model
model = pipeline.fit(train_data)

# Step 7: Make predictions on test data
predictions = model.transform(test_data)
predictions.select("age", "income", "label", "prediction", "probability").show(5)

# Step 8: Evaluate accuracy
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC Score: {auc:.4f}")
# AUC Score: 0.8731  (closer to 1.0 = better model)

Regression Example — Predicting House Prices

from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(
    inputCols=["bedrooms", "bathrooms", "sqft", "age"],
    outputCol="features"
)

lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages=[assembler, lr])

train_data, test_data = data.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)

# View model coefficients
lr_model = model.stages[-1]
print("Intercept:", lr_model.intercept)
print("Coefficients:", lr_model.coefficients)

# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")
rmse = evaluator.evaluate(model.transform(test_data))
print(f"Root Mean Squared Error: ${rmse:,.0f}")

Clustering — K-Means

Clustering groups records by similarity without labeled data. K-Means divides data into K clusters, where each cluster contains records closest to that cluster's center.

from pyspark.ml.clustering import KMeans

assembler = VectorAssembler(inputCols=["spend", "visits"], outputCol="features")
kmeans = KMeans(k=4, seed=1)  # create 4 customer segments

pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(customer_data)

# Add cluster assignment to each row
clustered = model.transform(customer_data)
clustered.select("customer_id", "spend", "visits", "prediction").show()

# prediction column = cluster number (0, 1, 2, or 3)
# Segment 0 = low spend, low visits (casual)
# Segment 3 = high spend, high visits (VIP)

Feature Engineering Tools in MLlib

ToolWhat It Does
VectorAssemblerCombines multiple columns into one feature vector
StringIndexerConverts category text to numeric index
OneHotEncoderConverts category index to binary vector
StandardScalerScales features to zero mean and unit variance
MinMaxScalerScales features to a 0–1 range
PCAReduces many features to a smaller set
TokenizerSplits text into words
HashingTF / IDFConverts text into numeric features for NLP

Saving and Loading Models

# Save a trained model to disk or cloud storage
model.save("/models/loan_default_v1")

# Load it back in another job or service
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/models/loan_default_v1")

# Make predictions immediately
new_predictions = loaded_model.transform(new_loan_applications)

Cross-Validation — Choosing the Best Model

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Try different combinations of hyperparameters
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=5   # 5-fold cross-validation
)

cv_model = cv.fit(train_data)
# Spark tests 6 combinations x 5 folds = 30 training runs
# Returns the best-performing configuration automatically
print("Best model AUC:", cv_model.avgMetrics)

Leave a Comment