Spark Structured Streaming

Structured Streaming processes live data streams using the same DataFrame API you already know from batch processing. Instead of processing a static file, Spark continuously processes new data as it arrives — from Kafka, files, sockets, or other sources.

The Infinite Table Mental Model

Think of a stream as a table that grows forever:

Time 0:
+-------+--------+--------+
|  user | action | amount |
+-------+--------+--------+

Time 1 (new data arrives):
+-------+--------+--------+
|  user | action | amount |
| Alice | login  |   --   |
+-------+--------+--------+

Time 2 (more data arrives):
+-------+--------+--------+
| Alice | login  |   --   |
| Bob   | buy    |  50.00 |
+-------+--------+--------+

Spark continuously runs your query on new rows as they appear.
Your code stays the same as for batch — only the source changes.

Key Concepts

  • Trigger — how often Spark checks for and processes new data
  • Micro-batch — Spark collects incoming records into small batches and processes each batch (default mode)
  • Continuous processing — Spark processes each record with very low latency (experimental)
  • Watermark — a threshold telling Spark how late data can arrive and still be included in results
  • Output mode — Append (only new rows), Complete (full result table), or Update (only changed rows)

Reading a Stream from a Socket (Learning Example)

# Start this in a terminal: nc -lk 9999
# Then type lines of text to send as a stream

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col

spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Read a streaming source
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Count words in the stream (same API as batch!)
words = lines.select(explode(split(col("value"), " ")).alias("word"))
word_counts = words.groupBy("word").count()

# Write results to console
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()  # run until stopped

Reading from Apache Kafka

Kafka is the most common source for production Spark streams. Kafka stores events as messages in topics. Spark reads those messages continuously.

# Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "orders_topic") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka delivers: key, value, topic, partition, offset, timestamp
# The 'value' column holds your actual message (as bytes)
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("order_id",  StringType(), True),
    StructField("customer",  StringType(), True),
    StructField("amount",    DoubleType(), True),
])

orders = kafka_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Now process orders with normal DataFrame operations
result = orders.groupBy("customer").sum("amount")

Reading a Streaming File Source

# Spark watches a folder and processes new files as they appear
streaming_df = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/data/incoming/")

# Any new CSV file dropped in /data/incoming/ gets processed automatically

Watermarks — Handling Late Data

In real streams, data sometimes arrives late. A watermark sets a time boundary: Spark includes records up to N minutes late, then stops waiting for older data.

from pyspark.sql.functions import window

result = orders \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("customer")
    ) \
    .sum("amount")

# Watermark diagram:
# Event time now = 12:00
# Watermark = 10 minutes
# Spark processes data with event_time >= 11:50
# Data with event_time < 11:50 is dropped (too late)

Writing a Stream to Output Sinks

# Write to console (debugging)
query = result.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

# Write to Parquet files (production)
query = result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/output/streaming_results") \
    .option("checkpointLocation", "/checkpoints/job1") \
    .trigger(processingTime="1 minute") \
    .start()

# Write to Kafka topic
query = result.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("topic", "processed_orders") \
    .option("checkpointLocation", "/checkpoints/job2") \
    .start()

Checkpointing — Fault Tolerance for Streams

Checkpoints save the stream's progress to storage. If the job crashes and restarts, Spark reads the checkpoint and continues from where it stopped — no data is lost or reprocessed.

# Always set a checkpoint location for production streams
query = result.writeStream \
    .option("checkpointLocation", "s3://my-bucket/checkpoints/job1") \
    .format("parquet") \
    .start("/output/results")

# Checkpoint stores:
# - Current offset in each Kafka partition
# - Aggregation state (for stateful queries)
# - Progress metadata

Output Modes Explained

ModeWritesUse When
AppendOnly new rows added since last triggerNo aggregations, or aggregations with watermarks
CompleteEntire result table every triggerAggregations without watermarks (total counts)
UpdateOnly rows that changed since last triggerAggregations where only changed rows matter

Leave a Comment