Spark Tuning and Performance
A Spark job that runs in 10 minutes on default settings might run in 2 minutes after tuning. Performance tuning involves three areas: memory configuration, parallelism settings, and query optimization. This topic covers practical techniques with real impact.
The Performance Diagnostic Workflow
Job is slow
|
v
Open Spark UI (localhost:4040)
|
+-- Is one stage much slower than others?
| Yes: shuffle or skew problem
|
+-- Is one task in a stage 10x slower than others?
| Yes: data skew
|
+-- Are executors running low on memory?
| Yes: increase executor memory or reduce data
|
+-- Is GC (garbage collection) time high?
Yes: too many small objects, use serialization
Memory Configuration
Spark divides executor memory into regions. Tuning these regions prevents out-of-memory errors and reduces garbage collection pauses.
# Submit a job with increased memory
spark-submit \
--executor-memory 8g \
--driver-memory 4g \
--num-executors 10 \
--executor-cores 4 \
my_job.py
# In code (for local testing)
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
Unified Memory Model
Executor Memory (e.g., 8 GB total):
+--------------------------------+
| Reserved (300 MB fixed) | -- Spark internal overhead
+--------------------------------+
| User Memory (40%) | -- your data structures
| e.g., 3 GB |
+--------------------------------+
| Spark Memory (60%) | -- split between:
| Execution (joins, sorts) |
| Storage (cached data) |
| e.g., 5 GB (shared pool) |
+--------------------------------+
Adjust the split:
spark.conf.set("spark.memory.fraction", "0.8") # default 0.6
spark.conf.set("spark.memory.storageFraction", "0.3") # default 0.5
Shuffle Tuning
Shuffles move data across the network and dominate job time for joins and groupBy operations. Two key settings control shuffle behavior.
# Number of partitions after a shuffle (default: 200)
# Too high for small data = overhead
# Too low for large data = giant partitions
# For a 10 GB dataset with 20 cores:
spark.conf.set("spark.sql.shuffle.partitions", "40")
# For a 1 TB dataset:
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# Enable Adaptive Query Execution (Spark 3.0+)
# AQE automatically adjusts shuffle partitions at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Adaptive Query Execution (AQE) — Spark 3.0+
AQE changes the execution plan at runtime based on actual data statistics. It handles three problems automatically: too many small shuffle partitions, join type selection, and skewed joins.
# Enable all AQE features
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# What AQE does:
# Before AQE: 200 shuffle partitions created regardless of data size
# With AQE: If 190 partitions have < 1 MB each, AQE merges them
# Result: 10 well-sized partitions = fewer tasks = faster
Kryo Serialization
By default, Spark uses Java serialization, which is slow. Kryo serialization is 2–10x faster and produces smaller serialized objects.
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
Predicate Pushdown — Filter Early
Predicate pushdown pushes filter conditions into the data source so Spark reads only the matching rows instead of loading everything and filtering afterward. Parquet and ORC formats support this natively.
# Spark pushes this filter down to the Parquet file reader
# Only rows where region = 'North' are read from disk
df = spark.read.parquet("sales.parquet").filter("region = 'North'")
df.explain()
# Physical plan shows:
# Filter: region = 'North'
# PushedFilters: [IsNotNull(region), EqualTo(region,North)]
# ReadSchema: struct<region:string, amount:double>
Column Pruning — Read Only What You Need
# BAD: reads all columns from Parquet, then drops unneeded ones
df = spark.read.parquet("sales.parquet").select("region", "amount")
# Spark's optimizer handles this automatically for Parquet
# Only 'region' and 'amount' columns are read from disk
# Other columns never loaded into memory
Avoid Wide Transformations When Possible
Wide transformations (cause shuffles):
groupBy(), join(), distinct(), repartition(), sortBy()
Narrow transformations (no shuffle, fast):
map(), filter(), flatMap(), select(), withColumn()
Strategy: Apply as many narrow transformations as possible
BEFORE wide transformations to reduce the data volume
entering the shuffle.
GOOD:
df.filter("amount > 1000") <-- narrow: reduce rows first
.select("region", "amount") <-- narrow: reduce columns
.groupBy("region") <-- wide: now shuffles less data
.sum("amount")
BAD:
df.groupBy("region") <-- wide: shuffles ALL data
.sum("amount")
.filter("sum(amount) > 1000") <-- filter comes after shuffle
Spark UI — Your Tuning Dashboard
| Tab | What to Look For |
|---|---|
| Jobs | Which job is slowest |
| Stages | Which stage takes the most time |
| Tasks | Skewed tasks (one task 10x slower than others) |
| Storage | Cached data, memory usage |
| Executors | GC time, memory used per executor |
| SQL | Physical plan, filter pushdown confirmation |
Quick Tuning Checklist
- Enable AQE:
spark.sql.adaptive.enabled = true - Set shuffle partitions to match data size (not default 200)
- Use broadcast joins for small lookup tables
- Use Parquet format for columnar reads and predicate pushdown
- Cache DataFrames used more than once
- Filter and select columns before joins and groupBy
- Use Kryo serialization for RDD workloads
