Spark Joins and Shuffles
Joins combine two datasets based on a matching key. They are essential for real-world data work — connecting orders to customers, products to categories, and events to users. Joins are also the most common source of performance problems in Spark because they involve data movement across the network.
Types of Joins
Table A (orders): Table B (customers):
order_id | customer_id customer_id | name
---------|------------ ------------|-------
1 | 101 101 | Alice
2 | 102 102 | Bob
3 | 999 103 | Carol
INNER JOIN (only matching rows):
order_id | customer_id | name
1 | 101 | Alice
2 | 102 | Bob
LEFT JOIN (all of A, matching from B):
order_id | customer_id | name
1 | 101 | Alice
2 | 102 | Bob
3 | 999 | null
RIGHT JOIN (all of B, matching from A):
order_id | customer_id | name
1 | 101 | Alice
2 | 102 | Bob
null | 103 | Carol
FULL OUTER JOIN (all rows from both):
order_id | customer_id | name
1 | 101 | Alice
2 | 102 | Bob
3 | 999 | null
null | 103 | Carol
Performing Joins in PySpark
orders_df = spark.read.csv("orders.csv", header=True, inferSchema=True)
customers_df = spark.read.csv("customers.csv", header=True, inferSchema=True)
# Inner join
result = orders_df.join(customers_df, on="customer_id", how="inner")
# Left join
result = orders_df.join(customers_df, on="customer_id", how="left")
# Join on multiple columns
result = orders_df.join(customers_df,
on=["customer_id", "region"],
how="inner")
# Join with different column names in each table
result = orders_df.join(customers_df,
orders_df["cust_id"] == customers_df["id"],
how="inner")
What Is a Shuffle?
A shuffle moves rows from one executor to another so that rows with the same key end up on the same machine. Shuffles are necessary for joins and groupBy operations, but they are slow because they send data over the network.
Before shuffle (data on original executors): Executor 1: (Alice, 100), (Bob, 200), (Alice, 50) Executor 2: (Carol, 300), (Bob, 150), (Alice, 75) After shuffle (grouped by name): Executor 1: (Alice, 100), (Alice, 50), (Alice, 75) <-- all Alice rows Executor 2: (Bob, 200), (Bob, 150) <-- all Bob rows Executor 3: (Carol, 300) <-- all Carol rows Now each executor can compute the total for its own key.
Broadcast Join — Avoiding the Shuffle
When one table is small (typically under a few hundred MB), Spark sends a copy of it to every executor. Each executor then joins its local data with the full small table without any network shuffle. This is the fastest type of join.
from pyspark.sql.functions import broadcast # products_df is a small lookup table (10,000 rows) # orders_df is large (100 million rows) result = orders_df.join(broadcast(products_df), on="product_id", how="inner") # What happens: # products_df (small) --> copied to EVERY executor (no shuffle) # Each executor joins its partition of orders with the local copy # No network traffic for orders_df at all
Without broadcast join: orders (100M rows) ----shuffle----> join --> result products (10K rows) ---shuffle----> join With broadcast join: products (10K rows) --broadcast--> all executors orders (100M rows) --no shuffle--> join locally on each executor
Automatic Broadcast Threshold
Spark automatically broadcasts tables smaller than spark.sql.autoBroadcastJoinThreshold (default 10 MB). Increase this for larger lookup tables.
# Increase auto-broadcast threshold to 100 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024))
# Disable auto-broadcast (force shuffle join)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Sort-Merge Join — The Default for Large Tables
When both tables are large, Spark uses sort-merge join: both sides shuffle by key, sort, then merge. This is efficient for large datasets but requires a full shuffle.
Sort-Merge Join steps:
1. Both DataFrames shuffle so rows with the same key go to the same partition.
2. Each partition sorts its rows by key.
3. Two sorted lists merge (like merging two sorted card decks).
[orders shuffled + sorted by customer_id] -\
>--[merge]--> result
[customers shuffled + sorted by customer_id]-/
Skew Joins — Handling Uneven Key Distribution
If one key value appears in millions of rows, one executor receives a massive amount of data while others finish quickly. This is join skew. Fix it by splitting the hot key into sub-keys or using Spark's Adaptive Query Execution (AQE).
# Enable Adaptive Query Execution (handles skew automatically in Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE detects skewed partitions and splits them automatically.
Join Performance Tips
- Use
broadcast()whenever one side is small - Enable AQE (
spark.sql.adaptive.enabled = true) for automatic skew handling - Filter both DataFrames before joining — smaller datasets join faster
- Select only the columns you need before joining — reduces shuffle data volume
- Avoid cross joins (no join key) — they produce a Cartesian product and grow explosively
