Spark Structured Streaming
Structured Streaming processes live data streams using the same DataFrame API you already know from batch processing. Instead of processing a static file, Spark continuously processes new data as it arrives — from Kafka, files, sockets, or other sources.
The Infinite Table Mental Model
Think of a stream as a table that grows forever: Time 0: +-------+--------+--------+ | user | action | amount | +-------+--------+--------+ Time 1 (new data arrives): +-------+--------+--------+ | user | action | amount | | Alice | login | -- | +-------+--------+--------+ Time 2 (more data arrives): +-------+--------+--------+ | Alice | login | -- | | Bob | buy | 50.00 | +-------+--------+--------+ Spark continuously runs your query on new rows as they appear. Your code stays the same as for batch — only the source changes.
Key Concepts
- Trigger — how often Spark checks for and processes new data
- Micro-batch — Spark collects incoming records into small batches and processes each batch (default mode)
- Continuous processing — Spark processes each record with very low latency (experimental)
- Watermark — a threshold telling Spark how late data can arrive and still be included in results
- Output mode — Append (only new rows), Complete (full result table), or Update (only changed rows)
Reading a Stream from a Socket (Learning Example)
# Start this in a terminal: nc -lk 9999
# Then type lines of text to send as a stream
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col
spark = SparkSession.builder.appName("WordCount").getOrCreate()
# Read a streaming source
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Count words in the stream (same API as batch!)
words = lines.select(explode(split(col("value"), " ")).alias("word"))
word_counts = words.groupBy("word").count()
# Write results to console
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination() # run until stopped
Reading from Apache Kafka
Kafka is the most common source for production Spark streams. Kafka stores events as messages in topics. Spark reads those messages continuously.
# Read from Kafka
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "orders_topic") \
.option("startingOffsets", "latest") \
.load()
# Kafka delivers: key, value, topic, partition, offset, timestamp
# The 'value' column holds your actual message (as bytes)
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer", StringType(), True),
StructField("amount", DoubleType(), True),
])
orders = kafka_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Now process orders with normal DataFrame operations
result = orders.groupBy("customer").sum("amount")
Reading a Streaming File Source
# Spark watches a folder and processes new files as they appear
streaming_df = spark.readStream \
.format("csv") \
.option("header", "true") \
.schema(schema) \
.load("/data/incoming/")
# Any new CSV file dropped in /data/incoming/ gets processed automatically
Watermarks — Handling Late Data
In real streams, data sometimes arrives late. A watermark sets a time boundary: Spark includes records up to N minutes late, then stops waiting for older data.
from pyspark.sql.functions import window
result = orders \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"),
col("customer")
) \
.sum("amount")
# Watermark diagram:
# Event time now = 12:00
# Watermark = 10 minutes
# Spark processes data with event_time >= 11:50
# Data with event_time < 11:50 is dropped (too late)
Writing a Stream to Output Sinks
# Write to console (debugging)
query = result.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
# Write to Parquet files (production)
query = result.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/output/streaming_results") \
.option("checkpointLocation", "/checkpoints/job1") \
.trigger(processingTime="1 minute") \
.start()
# Write to Kafka topic
query = result.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("topic", "processed_orders") \
.option("checkpointLocation", "/checkpoints/job2") \
.start()
Checkpointing — Fault Tolerance for Streams
Checkpoints save the stream's progress to storage. If the job crashes and restarts, Spark reads the checkpoint and continues from where it stopped — no data is lost or reprocessed.
# Always set a checkpoint location for production streams
query = result.writeStream \
.option("checkpointLocation", "s3://my-bucket/checkpoints/job1") \
.format("parquet") \
.start("/output/results")
# Checkpoint stores:
# - Current offset in each Kafka partition
# - Aggregation state (for stateful queries)
# - Progress metadata
Output Modes Explained
| Mode | Writes | Use When |
|---|---|---|
| Append | Only new rows added since last trigger | No aggregations, or aggregations with watermarks |
| Complete | Entire result table every trigger | Aggregations without watermarks (total counts) |
| Update | Only rows that changed since last trigger | Aggregations where only changed rows matter |
