Spark MLlib Machine Learning
MLlib is Spark's built-in machine learning library. It trains models on datasets spread across hundreds of machines, making it possible to work with training data that is too large for a single computer. MLlib includes algorithms for classification, regression, clustering, recommendation, and more.
ML Pipeline — The Assembly Line for Models
MLlib organizes machine learning workflows into Pipelines. A Pipeline chains data preparation steps and the model algorithm into one object. This keeps code clean and makes it easy to apply the same pipeline to new data.
Raw Data | v [Stage 1: StringIndexer] -- converts category text to numbers | v [Stage 2: VectorAssembler] -- combines all features into one column | v [Stage 3: StandardScaler] -- scales numbers to similar ranges | v [Stage 4: RandomForest] -- the model algorithm | v Predictions
Classification Example — Predicting Loan Default
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Sample data: each row is a loan application
# label = 1 (defaulted) or 0 (paid on time)
data = spark.read.csv("loans.csv", header=True, inferSchema=True)
data.printSchema()
# root
# |-- age: integer
# |-- income: double
# |-- loan_amount: double
# |-- credit_score: integer
# |-- employment: string
# |-- label: integer (1=default, 0=no default)
# Step 1: Convert 'employment' text to a number
indexer = StringIndexer(inputCol="employment", outputCol="employment_idx")
# Step 2: Combine all numeric features into a single "features" vector
assembler = VectorAssembler(
inputCols=["age", "income", "loan_amount", "credit_score", "employment_idx"],
outputCol="features"
)
# Step 3: Define the model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
# Step 4: Build the pipeline
pipeline = Pipeline(stages=[indexer, assembler, rf])
# Step 5: Split data into training (80%) and test (20%)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
# Step 6: Train the model
model = pipeline.fit(train_data)
# Step 7: Make predictions on test data
predictions = model.transform(test_data)
predictions.select("age", "income", "label", "prediction", "probability").show(5)
# Step 8: Evaluate accuracy
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC Score: {auc:.4f}")
# AUC Score: 0.8731 (closer to 1.0 = better model)
Regression Example — Predicting House Prices
from pyspark.ml.regression import LinearRegression
assembler = VectorAssembler(
inputCols=["bedrooms", "bathrooms", "sqft", "age"],
outputCol="features"
)
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages=[assembler, lr])
train_data, test_data = data.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)
# View model coefficients
lr_model = model.stages[-1]
print("Intercept:", lr_model.intercept)
print("Coefficients:", lr_model.coefficients)
# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")
rmse = evaluator.evaluate(model.transform(test_data))
print(f"Root Mean Squared Error: ${rmse:,.0f}")
Clustering — K-Means
Clustering groups records by similarity without labeled data. K-Means divides data into K clusters, where each cluster contains records closest to that cluster's center.
from pyspark.ml.clustering import KMeans
assembler = VectorAssembler(inputCols=["spend", "visits"], outputCol="features")
kmeans = KMeans(k=4, seed=1) # create 4 customer segments
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(customer_data)
# Add cluster assignment to each row
clustered = model.transform(customer_data)
clustered.select("customer_id", "spend", "visits", "prediction").show()
# prediction column = cluster number (0, 1, 2, or 3)
# Segment 0 = low spend, low visits (casual)
# Segment 3 = high spend, high visits (VIP)
Feature Engineering Tools in MLlib
| Tool | What It Does |
|---|---|
| VectorAssembler | Combines multiple columns into one feature vector |
| StringIndexer | Converts category text to numeric index |
| OneHotEncoder | Converts category index to binary vector |
| StandardScaler | Scales features to zero mean and unit variance |
| MinMaxScaler | Scales features to a 0–1 range |
| PCA | Reduces many features to a smaller set |
| Tokenizer | Splits text into words |
| HashingTF / IDF | Converts text into numeric features for NLP |
Saving and Loading Models
# Save a trained model to disk or cloud storage
model.save("/models/loan_default_v1")
# Load it back in another job or service
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/models/loan_default_v1")
# Make predictions immediately
new_predictions = loaded_model.transform(new_loan_applications)
Cross-Validation — Choosing the Best Model
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Try different combinations of hyperparameters
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10]) \
.build()
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5 # 5-fold cross-validation
)
cv_model = cv.fit(train_data)
# Spark tests 6 combinations x 5 folds = 30 training runs
# Returns the best-performing configuration automatically
print("Best model AUC:", cv_model.avgMetrics)
