Spark Broadcast Variables and Accumulators

Spark provides two special types of shared variables that solve common distributed computing problems: broadcast variables for sharing read-only data efficiently, and accumulators for collecting counts or sums from across the cluster.

The Problem with Normal Variables in Spark

When you use a regular Python variable inside a Spark transformation, Spark serializes a copy of that variable and sends it with every task. If you run 1,000 tasks and the variable is 10 MB, Spark sends 10 GB of data total — a huge waste.

# BAD: lookup_table is sent to EVERY task separately
lookup_table = {"NY": "New York", "CA": "California", "TX": "Texas"}

rdd.map(lambda row: lookup_table.get(row.state, "Unknown"))
# lookup_table is copied with each of the 1000 tasks = 1000 copies

Broadcast Variables — One Copy Per Executor

A broadcast variable sends one copy of the data to each executor. All tasks on that executor share the same copy. With 1,000 tasks across 10 executors, only 10 copies of the variable exist instead of 1,000.

# Create a broadcast variable
lookup_table = {"NY": "New York", "CA": "California", "TX": "Texas"}
bc_lookup = sc.broadcast(lookup_table)

# Use it inside a transformation with .value
result = rdd.map(lambda row: bc_lookup.value.get(row.state, "Unknown"))

# Broadcast sends ONE copy to each executor:
# Executor 1: has bc_lookup.value --> used by 100 tasks
# Executor 2: has bc_lookup.value --> used by 100 tasks
# ...
# Total: 10 copies instead of 1000

Broadcast in DataFrames

from pyspark.sql.functions import broadcast

# Small product lookup table
product_df = spark.read.csv("products.csv", header=True)

# Large sales table
sales_df = spark.read.parquet("sales.parquet")

# Broadcast the small table for an efficient join
result = sales_df.join(broadcast(product_df), on="product_id")

Releasing a Broadcast Variable

# Free the broadcast variable from executor memory when done
bc_lookup.unpersist()   # removes from executors
bc_lookup.destroy()     # removes from executors AND driver

Accumulators — Counting Across the Cluster

An accumulator is a variable that tasks on executors can only add to. The Driver reads the final total after all tasks complete. Think of it as a vote counter: each voting booth (executor) adds votes locally, and the central office (Driver) announces the final total.

# Create an accumulator (starts at 0)
error_count = sc.accumulator(0)

# Use it inside a transformation
def process_record(row):
    global error_count
    if row["amount"] < 0:
        error_count.add(1)   # increment on executor
        return None
    return row

rdd.foreach(process_record)

# Read the total on the Driver after all tasks finish
print(f"Total error records: {error_count.value}")
# Output: Total error records: 284

Accumulator Rules

Rule 1: Executors can only ADD to accumulators.
        Executors CANNOT read the accumulator value.

Rule 2: The Driver can only READ the accumulator.
        The Driver cannot add to it inside a task.

Rule 3: Accumulators are only guaranteed to be correct
        when read AFTER an action completes.

Driver writes: sc.accumulator(0)      <-- OK
Executor adds: error_count.add(1)     <-- OK
Driver reads:  error_count.value      <-- OK (after action)
Executor reads: error_count.value     <-- unreliable, avoid

Named Accumulators — Visible in Spark UI

# Give the accumulator a name to see it in the Spark UI
null_count = sc.accumulator(0, "Null Record Counter")
skipped    = sc.accumulator(0, "Skipped Records")

# Both appear in the Spark UI under Accumulators tab during the job

Custom Accumulator Types

The default accumulator handles numbers. For other types (like collecting a set of values), you create a custom AccumulatorParam.

from pyspark import AccumulatorParam

class SetAccumulator(AccumulatorParam):
    def zero(self, value):
        return set()
    def addInPlace(self, v1, v2):
        v1 |= v2
        return v1

# Track unique error codes seen across all tasks
unique_errors = sc.accumulator(set(), SetAccumulator())

def check(row):
    if row["status"] == "error":
        unique_errors.add({row["error_code"]})

rdd.foreach(check)
print("Unique error codes:", unique_errors.value)

Broadcast vs Accumulator — Quick Reference

FeatureBroadcast VariableAccumulator
DirectionDriver → ExecutorsExecutors → Driver
PurposeShare read-only dataCollect counts/sums
Executor can read?YesNo (unreliable)
Executor can write?NoYes (add only)
Driver reads result?N/AYes, after action
Common useLookup tables, configError counts, metrics

Leave a Comment