Spark Broadcast Variables and Accumulators
Spark provides two special types of shared variables that solve common distributed computing problems: broadcast variables for sharing read-only data efficiently, and accumulators for collecting counts or sums from across the cluster.
The Problem with Normal Variables in Spark
When you use a regular Python variable inside a Spark transformation, Spark serializes a copy of that variable and sends it with every task. If you run 1,000 tasks and the variable is 10 MB, Spark sends 10 GB of data total — a huge waste.
# BAD: lookup_table is sent to EVERY task separately
lookup_table = {"NY": "New York", "CA": "California", "TX": "Texas"}
rdd.map(lambda row: lookup_table.get(row.state, "Unknown"))
# lookup_table is copied with each of the 1000 tasks = 1000 copies
Broadcast Variables — One Copy Per Executor
A broadcast variable sends one copy of the data to each executor. All tasks on that executor share the same copy. With 1,000 tasks across 10 executors, only 10 copies of the variable exist instead of 1,000.
# Create a broadcast variable
lookup_table = {"NY": "New York", "CA": "California", "TX": "Texas"}
bc_lookup = sc.broadcast(lookup_table)
# Use it inside a transformation with .value
result = rdd.map(lambda row: bc_lookup.value.get(row.state, "Unknown"))
# Broadcast sends ONE copy to each executor:
# Executor 1: has bc_lookup.value --> used by 100 tasks
# Executor 2: has bc_lookup.value --> used by 100 tasks
# ...
# Total: 10 copies instead of 1000
Broadcast in DataFrames
from pyspark.sql.functions import broadcast
# Small product lookup table
product_df = spark.read.csv("products.csv", header=True)
# Large sales table
sales_df = spark.read.parquet("sales.parquet")
# Broadcast the small table for an efficient join
result = sales_df.join(broadcast(product_df), on="product_id")
Releasing a Broadcast Variable
# Free the broadcast variable from executor memory when done bc_lookup.unpersist() # removes from executors bc_lookup.destroy() # removes from executors AND driver
Accumulators — Counting Across the Cluster
An accumulator is a variable that tasks on executors can only add to. The Driver reads the final total after all tasks complete. Think of it as a vote counter: each voting booth (executor) adds votes locally, and the central office (Driver) announces the final total.
# Create an accumulator (starts at 0)
error_count = sc.accumulator(0)
# Use it inside a transformation
def process_record(row):
global error_count
if row["amount"] < 0:
error_count.add(1) # increment on executor
return None
return row
rdd.foreach(process_record)
# Read the total on the Driver after all tasks finish
print(f"Total error records: {error_count.value}")
# Output: Total error records: 284
Accumulator Rules
Rule 1: Executors can only ADD to accumulators.
Executors CANNOT read the accumulator value.
Rule 2: The Driver can only READ the accumulator.
The Driver cannot add to it inside a task.
Rule 3: Accumulators are only guaranteed to be correct
when read AFTER an action completes.
Driver writes: sc.accumulator(0) <-- OK
Executor adds: error_count.add(1) <-- OK
Driver reads: error_count.value <-- OK (after action)
Executor reads: error_count.value <-- unreliable, avoid
Named Accumulators — Visible in Spark UI
# Give the accumulator a name to see it in the Spark UI null_count = sc.accumulator(0, "Null Record Counter") skipped = sc.accumulator(0, "Skipped Records") # Both appear in the Spark UI under Accumulators tab during the job
Custom Accumulator Types
The default accumulator handles numbers. For other types (like collecting a set of values), you create a custom AccumulatorParam.
from pyspark import AccumulatorParam
class SetAccumulator(AccumulatorParam):
def zero(self, value):
return set()
def addInPlace(self, v1, v2):
v1 |= v2
return v1
# Track unique error codes seen across all tasks
unique_errors = sc.accumulator(set(), SetAccumulator())
def check(row):
if row["status"] == "error":
unique_errors.add({row["error_code"]})
rdd.foreach(check)
print("Unique error codes:", unique_errors.value)
Broadcast vs Accumulator — Quick Reference
| Feature | Broadcast Variable | Accumulator |
|---|---|---|
| Direction | Driver → Executors | Executors → Driver |
| Purpose | Share read-only data | Collect counts/sums |
| Executor can read? | Yes | No (unreliable) |
| Executor can write? | No | Yes (add only) |
| Driver reads result? | N/A | Yes, after action |
| Common use | Lookup tables, config | Error counts, metrics |
