Spark RDD Actions
An action is the signal that tells Spark to actually run all the transformations you have defined. Without an action, Spark only builds a plan — nothing executes. Actions return a result to the Driver or write output to storage.
Transformations vs Actions — The On/Off Switch
Transformations (PLAN only, no execution): rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = rdd1.map(lambda x: x * 10) <-- nothing runs rdd3 = rdd2.filter(lambda x: x > 20) <-- nothing runs Action (TRIGGERS execution of everything above): result = rdd3.collect() <-- NOW it runs print(result) # [30, 40, 50]
collect() — Bring All Data to Driver
collect() returns every record in the RDD as a Python list. Use it only when the dataset is small enough to fit in the Driver's memory. Collecting billions of records crashes the Driver.
rdd = sc.parallelize(["apple", "banana", "cherry"]) print(rdd.collect()) # Output: ['apple', 'banana', 'cherry']
count() — Count All Records
rdd = sc.parallelize([10, 20, 30, 40, 50]) print(rdd.count()) # Output: 5
first() and take() — Peek at Data
rdd = sc.parallelize([100, 200, 300, 400, 500]) print(rdd.first()) # 100 print(rdd.take(3)) # [100, 200, 300] # top() returns the N largest values print(rdd.top(3)) # [500, 400, 300]
reduce() — Combine All Records into One Value
reduce() applies a function pairwise across all records until one value remains. Think of it as folding a long list into a single answer.
numbers = sc.parallelize([1, 2, 3, 4, 5]) # Sum all numbers total = numbers.reduce(lambda a, b: a + b) print(total) # 15 # How it works step by step: # reduce( [1, 2, 3, 4, 5] ) # Step 1: f(1,2) = 3 # Step 2: f(3,3) = 6 # Step 3: f(6,4) = 10 # Step 4: f(10,5) = 15
fold() — reduce() with a Starting Value
numbers = sc.parallelize([1, 2, 3, 4, 5]) # Start from 10 and add everything result = numbers.fold(10, lambda a, b: a + b) print(result) # 25 (10 + 1+2+3+4+5)
aggregate() — Flexible Aggregation
aggregate() lets you collect different types of information in one pass. The example below computes sum and count simultaneously.
numbers = sc.parallelize([1, 2, 3, 4, 5])
# zeroValue = (sum_so_far, count_so_far)
result = numbers.aggregate(
(0, 0), # starting value
lambda acc, x: (acc[0] + x, acc[1] + 1), # per partition
lambda a, b: (a[0] + b[0], a[1] + b[1]) # combine partitions
)
print(result) # (15, 5)
average = result[0] / result[1]
print(average) # 3.0
foreach() — Apply a Function Without Returning Data
foreach() runs a function on each record but returns nothing to the Driver. Use it to write records to a database or external system.
rdd = sc.parallelize([1, 2, 3, 4, 5]) # Print each element on its executor (not returned to Driver) rdd.foreach(lambda x: print(x)) # Common use: save to external DB row by row # rdd.foreach(lambda row: db.insert(row))
saveAsTextFile() — Write to Storage
This action writes each partition as a separate file in a directory. It works with local paths and HDFS.
rdd = sc.parallelize(["line one", "line two", "line three"])
rdd.saveAsTextFile("/output/results")
# Creates files like:
# /output/results/part-00000
# /output/results/part-00001
countByValue() and countByKey()
colors = sc.parallelize(["red", "blue", "red", "green", "blue", "red"])
print(colors.countByValue())
# Output: defaultdict(<class 'int'>, {'red': 3, 'blue': 2, 'green': 1})
pairs = sc.parallelize([("A", 1), ("B", 2), ("A", 3)])
print(pairs.countByKey())
# Output: defaultdict(<class 'int'>, {'A': 2, 'B': 1})
Actions at a Glance
| Action | Returns | Use When |
|---|---|---|
| collect() | Python list | Data is small enough for Driver memory |
| count() | Integer | You need the row count |
| first() | Single element | You need one sample record |
| take(n) | Python list | You need a small sample |
| reduce(f) | Single value | Aggregate all records to one result |
| foreach(f) | Nothing | Write to external systems |
| saveAsTextFile() | Nothing | Persist results to disk |
| countByValue() | Dictionary | Frequency count of each value |
