Spark Tuning and Performance

A Spark job that runs in 10 minutes on default settings might run in 2 minutes after tuning. Performance tuning involves three areas: memory configuration, parallelism settings, and query optimization. This topic covers practical techniques with real impact.

The Performance Diagnostic Workflow

Job is slow
    |
    v
Open Spark UI (localhost:4040)
    |
    +-- Is one stage much slower than others?
    |       Yes: shuffle or skew problem
    |
    +-- Is one task in a stage 10x slower than others?
    |       Yes: data skew
    |
    +-- Are executors running low on memory?
    |       Yes: increase executor memory or reduce data
    |
    +-- Is GC (garbage collection) time high?
            Yes: too many small objects, use serialization

Memory Configuration

Spark divides executor memory into regions. Tuning these regions prevents out-of-memory errors and reduces garbage collection pauses.

# Submit a job with increased memory
spark-submit \
    --executor-memory 8g \
    --driver-memory 4g \
    --num-executors 10 \
    --executor-cores 4 \
    my_job.py

# In code (for local testing)
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

Unified Memory Model

Executor Memory (e.g., 8 GB total):

+--------------------------------+
| Reserved (300 MB fixed)        |  -- Spark internal overhead
+--------------------------------+
| User Memory (40%)              |  -- your data structures
|  e.g., 3 GB                    |
+--------------------------------+
| Spark Memory (60%)             |  -- split between:
|  Execution (joins, sorts)      |
|  Storage (cached data)         |
|  e.g., 5 GB (shared pool)      |
+--------------------------------+

Adjust the split:
spark.conf.set("spark.memory.fraction", "0.8")         # default 0.6
spark.conf.set("spark.memory.storageFraction", "0.3")  # default 0.5

Shuffle Tuning

Shuffles move data across the network and dominate job time for joins and groupBy operations. Two key settings control shuffle behavior.

# Number of partitions after a shuffle (default: 200)
# Too high for small data = overhead
# Too low for large data = giant partitions

# For a 10 GB dataset with 20 cores:
spark.conf.set("spark.sql.shuffle.partitions", "40")

# For a 1 TB dataset:
spark.conf.set("spark.sql.shuffle.partitions", "2000")

# Enable Adaptive Query Execution (Spark 3.0+)
# AQE automatically adjusts shuffle partitions at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Adaptive Query Execution (AQE) — Spark 3.0+

AQE changes the execution plan at runtime based on actual data statistics. It handles three problems automatically: too many small shuffle partitions, join type selection, and skewed joins.

# Enable all AQE features
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# What AQE does:
# Before AQE: 200 shuffle partitions created regardless of data size
# With AQE:   If 190 partitions have < 1 MB each, AQE merges them
#             Result: 10 well-sized partitions = fewer tasks = faster

Kryo Serialization

By default, Spark uses Java serialization, which is slow. Kryo serialization is 2–10x faster and produces smaller serialized objects.

spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    .getOrCreate()

Predicate Pushdown — Filter Early

Predicate pushdown pushes filter conditions into the data source so Spark reads only the matching rows instead of loading everything and filtering afterward. Parquet and ORC formats support this natively.

# Spark pushes this filter down to the Parquet file reader
# Only rows where region = 'North' are read from disk
df = spark.read.parquet("sales.parquet").filter("region = 'North'")
df.explain()

# Physical plan shows:
# Filter: region = 'North'
# PushedFilters: [IsNotNull(region), EqualTo(region,North)]
# ReadSchema: struct<region:string, amount:double>

Column Pruning — Read Only What You Need

# BAD: reads all columns from Parquet, then drops unneeded ones
df = spark.read.parquet("sales.parquet").select("region", "amount")

# Spark's optimizer handles this automatically for Parquet
# Only 'region' and 'amount' columns are read from disk
# Other columns never loaded into memory

Avoid Wide Transformations When Possible

Wide transformations (cause shuffles):
  groupBy(), join(), distinct(), repartition(), sortBy()

Narrow transformations (no shuffle, fast):
  map(), filter(), flatMap(), select(), withColumn()

Strategy: Apply as many narrow transformations as possible
BEFORE wide transformations to reduce the data volume
entering the shuffle.

GOOD:
df.filter("amount > 1000")       <-- narrow: reduce rows first
  .select("region", "amount")    <-- narrow: reduce columns
  .groupBy("region")             <-- wide: now shuffles less data
  .sum("amount")

BAD:
df.groupBy("region")             <-- wide: shuffles ALL data
  .sum("amount")
  .filter("sum(amount) > 1000")  <-- filter comes after shuffle

Spark UI — Your Tuning Dashboard

TabWhat to Look For
JobsWhich job is slowest
StagesWhich stage takes the most time
TasksSkewed tasks (one task 10x slower than others)
StorageCached data, memory usage
ExecutorsGC time, memory used per executor
SQLPhysical plan, filter pushdown confirmation

Quick Tuning Checklist

  • Enable AQE: spark.sql.adaptive.enabled = true
  • Set shuffle partitions to match data size (not default 200)
  • Use broadcast joins for small lookup tables
  • Use Parquet format for columnar reads and predicate pushdown
  • Cache DataFrames used more than once
  • Filter and select columns before joins and groupBy
  • Use Kryo serialization for RDD workloads

Leave a Comment