Spark RDD Actions

An action is the signal that tells Spark to actually run all the transformations you have defined. Without an action, Spark only builds a plan — nothing executes. Actions return a result to the Driver or write output to storage.

Transformations vs Actions — The On/Off Switch

Transformations (PLAN only, no execution):
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = rdd1.map(lambda x: x * 10)     <-- nothing runs
rdd3 = rdd2.filter(lambda x: x > 20)  <-- nothing runs

Action (TRIGGERS execution of everything above):
result = rdd3.collect()                 <-- NOW it runs
print(result)  # [30, 40, 50]

collect() — Bring All Data to Driver

collect() returns every record in the RDD as a Python list. Use it only when the dataset is small enough to fit in the Driver's memory. Collecting billions of records crashes the Driver.

rdd = sc.parallelize(["apple", "banana", "cherry"])
print(rdd.collect())
# Output: ['apple', 'banana', 'cherry']

count() — Count All Records

rdd = sc.parallelize([10, 20, 30, 40, 50])
print(rdd.count())
# Output: 5

first() and take() — Peek at Data

rdd = sc.parallelize([100, 200, 300, 400, 500])

print(rdd.first())    # 100
print(rdd.take(3))    # [100, 200, 300]

# top() returns the N largest values
print(rdd.top(3))     # [500, 400, 300]

reduce() — Combine All Records into One Value

reduce() applies a function pairwise across all records until one value remains. Think of it as folding a long list into a single answer.

numbers = sc.parallelize([1, 2, 3, 4, 5])

# Sum all numbers
total = numbers.reduce(lambda a, b: a + b)
print(total)  # 15

# How it works step by step:
# reduce( [1, 2, 3, 4, 5] )
# Step 1: f(1,2) = 3
# Step 2: f(3,3) = 6
# Step 3: f(6,4) = 10
# Step 4: f(10,5) = 15

fold() — reduce() with a Starting Value

numbers = sc.parallelize([1, 2, 3, 4, 5])

# Start from 10 and add everything
result = numbers.fold(10, lambda a, b: a + b)
print(result)  # 25  (10 + 1+2+3+4+5)

aggregate() — Flexible Aggregation

aggregate() lets you collect different types of information in one pass. The example below computes sum and count simultaneously.

numbers = sc.parallelize([1, 2, 3, 4, 5])

# zeroValue = (sum_so_far, count_so_far)
result = numbers.aggregate(
    (0, 0),                                    # starting value
    lambda acc, x: (acc[0] + x, acc[1] + 1),  # per partition
    lambda a, b: (a[0] + b[0], a[1] + b[1])   # combine partitions
)
print(result)  # (15, 5)

average = result[0] / result[1]
print(average)  # 3.0

foreach() — Apply a Function Without Returning Data

foreach() runs a function on each record but returns nothing to the Driver. Use it to write records to a database or external system.

rdd = sc.parallelize([1, 2, 3, 4, 5])

# Print each element on its executor (not returned to Driver)
rdd.foreach(lambda x: print(x))

# Common use: save to external DB row by row
# rdd.foreach(lambda row: db.insert(row))

saveAsTextFile() — Write to Storage

This action writes each partition as a separate file in a directory. It works with local paths and HDFS.

rdd = sc.parallelize(["line one", "line two", "line three"])
rdd.saveAsTextFile("/output/results")

# Creates files like:
# /output/results/part-00000
# /output/results/part-00001

countByValue() and countByKey()

colors = sc.parallelize(["red", "blue", "red", "green", "blue", "red"])

print(colors.countByValue())
# Output: defaultdict(<class 'int'>, {'red': 3, 'blue': 2, 'green': 1})

pairs = sc.parallelize([("A", 1), ("B", 2), ("A", 3)])
print(pairs.countByKey())
# Output: defaultdict(<class 'int'>, {'A': 2, 'B': 1})

Actions at a Glance

ActionReturnsUse When
collect()Python listData is small enough for Driver memory
count()IntegerYou need the row count
first()Single elementYou need one sample record
take(n)Python listYou need a small sample
reduce(f)Single valueAggregate all records to one result
foreach(f)NothingWrite to external systems
saveAsTextFile()NothingPersist results to disk
countByValue()DictionaryFrequency count of each value

Leave a Comment