Spark Joins and Shuffles

Joins combine two datasets based on a matching key. They are essential for real-world data work — connecting orders to customers, products to categories, and events to users. Joins are also the most common source of performance problems in Spark because they involve data movement across the network.

Types of Joins

Table A (orders):          Table B (customers):
order_id | customer_id     customer_id | name
---------|------------     ------------|-------
    1    |    101               101    | Alice
    2    |    102               102    | Bob
    3    |    999               103    | Carol

INNER JOIN (only matching rows):
order_id | customer_id | name
    1    |    101      | Alice
    2    |    102      | Bob

LEFT JOIN (all of A, matching from B):
order_id | customer_id | name
    1    |    101      | Alice
    2    |    102      | Bob
    3    |    999      | null

RIGHT JOIN (all of B, matching from A):
order_id | customer_id | name
    1    |    101      | Alice
    2    |    102      | Bob
  null   |    103      | Carol

FULL OUTER JOIN (all rows from both):
order_id | customer_id | name
    1    |    101      | Alice
    2    |    102      | Bob
    3    |    999      | null
  null   |    103      | Carol

Performing Joins in PySpark

orders_df = spark.read.csv("orders.csv", header=True, inferSchema=True)
customers_df = spark.read.csv("customers.csv", header=True, inferSchema=True)

# Inner join
result = orders_df.join(customers_df, on="customer_id", how="inner")

# Left join
result = orders_df.join(customers_df, on="customer_id", how="left")

# Join on multiple columns
result = orders_df.join(customers_df,
    on=["customer_id", "region"],
    how="inner")

# Join with different column names in each table
result = orders_df.join(customers_df,
    orders_df["cust_id"] == customers_df["id"],
    how="inner")

What Is a Shuffle?

A shuffle moves rows from one executor to another so that rows with the same key end up on the same machine. Shuffles are necessary for joins and groupBy operations, but they are slow because they send data over the network.

Before shuffle (data on original executors):

Executor 1: (Alice, 100), (Bob, 200), (Alice, 50)
Executor 2: (Carol, 300), (Bob, 150), (Alice, 75)

After shuffle (grouped by name):

Executor 1: (Alice, 100), (Alice, 50), (Alice, 75)  <-- all Alice rows
Executor 2: (Bob, 200), (Bob, 150)                   <-- all Bob rows
Executor 3: (Carol, 300)                              <-- all Carol rows

Now each executor can compute the total for its own key.

Broadcast Join — Avoiding the Shuffle

When one table is small (typically under a few hundred MB), Spark sends a copy of it to every executor. Each executor then joins its local data with the full small table without any network shuffle. This is the fastest type of join.

from pyspark.sql.functions import broadcast

# products_df is a small lookup table (10,000 rows)
# orders_df is large (100 million rows)

result = orders_df.join(broadcast(products_df), on="product_id", how="inner")

# What happens:
# products_df (small) --> copied to EVERY executor (no shuffle)
# Each executor joins its partition of orders with the local copy
# No network traffic for orders_df at all
Without broadcast join:
orders (100M rows) ----shuffle----> join --> result
products (10K rows) ---shuffle----> join

With broadcast join:
products (10K rows) --broadcast--> all executors
orders (100M rows) --no shuffle--> join locally on each executor

Automatic Broadcast Threshold

Spark automatically broadcasts tables smaller than spark.sql.autoBroadcastJoinThreshold (default 10 MB). Increase this for larger lookup tables.

# Increase auto-broadcast threshold to 100 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024))

# Disable auto-broadcast (force shuffle join)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Sort-Merge Join — The Default for Large Tables

When both tables are large, Spark uses sort-merge join: both sides shuffle by key, sort, then merge. This is efficient for large datasets but requires a full shuffle.

Sort-Merge Join steps:

1. Both DataFrames shuffle so rows with the same key go to the same partition.
2. Each partition sorts its rows by key.
3. Two sorted lists merge (like merging two sorted card decks).

[orders shuffled + sorted by customer_id] -\
                                             >--[merge]--> result
[customers shuffled + sorted by customer_id]-/

Skew Joins — Handling Uneven Key Distribution

If one key value appears in millions of rows, one executor receives a massive amount of data while others finish quickly. This is join skew. Fix it by splitting the hot key into sub-keys or using Spark's Adaptive Query Execution (AQE).

# Enable Adaptive Query Execution (handles skew automatically in Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# AQE detects skewed partitions and splits them automatically.

Join Performance Tips

  • Use broadcast() whenever one side is small
  • Enable AQE (spark.sql.adaptive.enabled = true) for automatic skew handling
  • Filter both DataFrames before joining — smaller datasets join faster
  • Select only the columns you need before joining — reduces shuffle data volume
  • Avoid cross joins (no join key) — they produce a Cartesian product and grow explosively

Leave a Comment