Databricks Structured Streaming

Most data processing works like doing laundry. You collect clothes throughout the week, load them into the washing machine all at once, run a complete cycle, and get clean results at the end. This is batch processing — gather data, process it, produce results. The gap between when events happen and when you see results can be hours or days.

Some situations demand something different. A fraud detection system cannot wait until tomorrow morning to flag a fraudulent transaction that happened thirty seconds ago. A logistics platform cannot wait until end of day to reroute delivery drivers around an accident. A live sports app cannot delay score updates by an hour. These situations need streaming — processing data continuously as it arrives, producing results within seconds or milliseconds.

Databricks Structured Streaming is Apache Spark's engine for continuous data processing. It treats an ongoing data stream like an infinite table that keeps getting new rows. Queries run against this table produce continuous, updated results as new data arrives — automatically, reliably, and at scale.

The Core Mental Model: The Unbounded Table

Understanding Structured Streaming starts with one central idea. Imagine a spreadsheet that never stops growing. Every second, new rows appear at the bottom. You write a query — "sum the sales column" or "count rows where status equals error" — and the result updates automatically every time new rows arrive. You never manually refresh. You never run the query again. The result just stays current.

That is exactly what Structured Streaming does. The data stream is the infinite spreadsheet. Your Spark query runs continuously. The output updates as new data arrives.

The power of this model is that it uses the same API as batch Spark. If you know how to write a Spark DataFrame query for batch data, you can write a Structured Streaming query. The code looks nearly identical. Spark handles all the complexity of processing data continuously under the hood.

Sources: Where Streaming Data Comes From

Structured Streaming reads data from sources — systems that continuously produce new data. The most common source in production deployments is Apache Kafka, a distributed messaging system that accepts data from thousands of producers simultaneously and makes it available to consumers in order.

Apache Kafka

Kafka works like a postal sorting facility. Thousands of senders (producers) drop off letters (messages) continuously. The facility sorts letters by topic into separate bins (Kafka topics). Consumers pick up letters from the bins they care about at their own pace. Kafka keeps letters in the bins for a configurable retention period, so a consumer that falls behind can catch up.

In data terms, a website might publish user click events to a Kafka topic called page_clicks. A payment system publishes transaction events to payment_transactions. A sensor network publishes temperature readings to sensor_readings. Structured Streaming reads from these topics continuously.

Other Sources

  • Delta Lake tables — Delta tables used as streaming sources emit new rows as they appear. A batch job writing to a Delta table becomes a source for downstream streaming jobs automatically.
  • Cloud storage — Files appearing in an S3 bucket, Azure Blob Storage container, or GCS folder trigger processing as they arrive.
  • Auto Loader — Databricks' enhanced cloud file source that efficiently discovers new files and processes them exactly once, even with millions of files in storage.
  • Kinesis and Event Hubs — AWS Kinesis and Azure Event Hubs are cloud-native streaming services that Structured Streaming reads natively.

Sinks: Where Streaming Results Go

Processing data is only useful if the results go somewhere actionable. Structured Streaming writes results to sinks — output destinations.

  • Delta Lake — The most common sink in Databricks. Streaming results land in a Delta table that other processes, dashboards, and users can query immediately.
  • Kafka — Enriched or transformed events get published back to a Kafka topic for downstream consumers.
  • Cloud storage — Results write as files (Parquet, JSON, CSV) to cloud storage for downstream batch processing.
  • Console — For development and debugging only; prints results to the Spark driver output.
  • Memory — For testing; stores results in an in-memory table queryable within the same Spark session.

Output Modes: Controlling What Gets Written

Structured Streaming supports three output modes that control what portion of the results gets written each time the streaming query produces new output.

Append Mode

Append mode writes only new rows that were added since the last output. No existing rows in the sink are modified. This is the default and most common mode for streaming use cases where each event is independent.

Example: Every transaction processed gets appended to a Delta table as a new row. Existing transaction rows never change.

Complete Mode

Complete mode rewrites the entire result table every time it updates. This is appropriate for aggregation queries where the complete current state of the aggregation is the meaningful output.

Example: A running count of errors by type gets rewritten in full every minute, showing the current total count for each error type. The sink always reflects the complete current state.

Update Mode

Update mode writes only the rows that changed since the last output. For aggregations, this means only the groups whose counts or sums changed get written. This is more efficient than complete mode when only a small fraction of aggregated results change with each batch of new data.

Triggers: Controlling How Often Streaming Runs

By default, Structured Streaming processes new data as quickly as possible — as soon as one micro-batch finishes processing, the next one starts. This provides minimum latency but uses maximum compute resources. Triggers let you control the processing frequency.

Processing Time Trigger

Process new data on a fixed interval — every 30 seconds, every minute, every hour. The query wakes up on the schedule, processes all data that arrived since the last run, writes the output, and sleeps until the next interval.

query = df.writeStream \
    .trigger(processingTime="30 seconds") \
    .format("delta") \
    .outputMode("append") \
    .start("/path/to/output/table")

Available-Now Trigger (Batch Mode)

Process all currently available data, then stop. This converts a streaming query into a one-time batch run. It is useful for scheduled jobs that need the reliability and exactly-once semantics of streaming but should run on a cron schedule rather than continuously.

query = df.writeStream \
    .trigger(availableNow=True) \
    .format("delta") \
    .outputMode("append") \
    .start("/path/to/output/table")

Continuous Trigger

Processes data with extremely low latency — under one millisecond in some cases — by running continuously without micro-batch boundaries. Currently supported for limited operation types. Most production use cases stick with the default micro-batch approach.

Checkpointing: Fault Tolerance Without Data Loss

Streaming jobs run for hours, days, or months. During that time, clusters restart, network interruptions occur, and unexpected failures happen. Without protection, a failure could mean lost data or duplicate processing.

Checkpointing solves this. Structured Streaming periodically saves its state — which data it has already processed, where it is in each Kafka topic, what aggregation state it has accumulated — to a checkpoint location in cloud storage. If the streaming job fails and restarts, it reads the checkpoint, determines exactly where it left off, and continues from that point. No data is lost. No data is processed twice.

Enabling checkpointing requires one configuration line:

query = df.writeStream \
    .option("checkpointLocation", "/path/to/checkpoint/directory") \
    .format("delta") \
    .start("/path/to/output/table")

The checkpoint directory must be on a persistent storage system — not the cluster's local disk, which disappears when the cluster terminates. Cloud storage (S3, ADLS, GCS) works correctly.

Stateful Operations: Aggregations Across Time

Simple streaming operations — filtering events, reformatting fields, enriching each event with a lookup — are stateless. Each event is processed independently with no memory of previous events.

More powerful operations require state — memory of what happened before the current event. Counting events per user over a 10-minute window requires remembering how many events each user generated in the last 10 minutes. Structured Streaming manages this state automatically, storing it in memory and spilling to disk when needed.

Window Aggregations

Window aggregations compute aggregated results (counts, sums, averages) over sliding time windows rather than all data since the stream began.

A streaming query counting website errors per minute over a rolling 5-minute window looks like this:

from pyspark.sql.functions import window

windowed_counts = events_df \
    .filter(events_df.event_type == "error") \
    .groupBy(
        window(events_df.timestamp, "5 minutes", "1 minute"),
        events_df.error_code
    ) \
    .count()

This query maintains a separate count for each error code within each 5-minute window. Every minute, the window slides forward, dropping events older than 5 minutes and adding newly arrived events. A dashboard reading from the output Delta table always shows the error counts for the most recent 5 minutes.

Watermarks: Handling Late-Arriving Data

Real-world streaming data does not always arrive in order. A mobile app records an event at 10:00 AM but the user's phone is offline. The event finally arrives at the streaming system at 10:45 AM — 45 minutes late. How should the system handle this?

Without a rule, the system would need to wait indefinitely for potentially late events before finalizing any window result. That approach makes results unavailable for too long and accumulates unbounded state.

Watermarks define the maximum lateness the system tolerates. A watermark of 10 minutes means the system accepts events up to 10 minutes late and discards anything later.

windowed_counts = events_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(events_df.timestamp, "5 minutes", "1 minute"),
        events_df.error_code
    ) \
    .count()

With this watermark, an event that arrives 8 minutes late still gets included in the correct window. An event arriving 15 minutes late gets dropped. The 10-minute threshold lets the system finalize window results after waiting a reasonable amount of time, freeing up the state memory used to track those windows.

Delta Live Tables and Structured Streaming

Databricks Delta Live Tables (DLT) builds on Structured Streaming to provide a declarative pipeline framework. Instead of writing the imperative streaming code (explicitly telling Spark each step to take), DLT lets you declare what the output should be and Databricks manages execution, checkpointing, monitoring, and error recovery automatically.

A DLT streaming pipeline looks like:

import dlt

@dlt.table(comment="Cleaned transaction events")
def cleaned_transactions():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "payment_transactions")
        .load()
        .select(from_json(col("value").cast("string"), schema).alias("data"))
        .select("data.*")
        .filter(col("amount") > 0)
    )

DLT handles starting, stopping, and restarting the streaming query. It monitors for failures and retries automatically. It tracks data quality metrics and can quarantine invalid records. Organizations that want robust production streaming pipelines without managing the operational complexity of raw Structured Streaming use DLT.

Auto Loader: Efficient File Ingestion

Auto Loader is Databricks' solution for streaming new files from cloud storage into Delta tables. It solves a specific challenge: cloud storage buckets can contain millions of files. Standard methods for detecting new files (listing the entire bucket repeatedly) become extremely slow and expensive at scale.

Auto Loader uses cloud-native notification services (AWS S3 Event Notifications, Azure Event Grid, GCS Pub/Sub) to detect new files efficiently. When a file appears in the monitored path, the cloud storage service sends an event notification. Auto Loader receives the notification and processes only that new file — no repeated full bucket listings.

raw_data = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/path/to/schema")
    .load("/path/to/landing/zone/")
)

Auto Loader also handles schema evolution automatically. If a new field appears in the JSON files, Auto Loader detects the new field and updates the target table's schema without requiring manual intervention. This makes it ideal for ingesting data from external systems where the schema may evolve over time.

Real-World Scenario: Real-Time Fraud Detection

A financial institution processes 50,000 payment transactions per minute across millions of customers. Every transaction needs to be evaluated for fraud risk before authorization — a process that must complete within 200 milliseconds.

The streaming architecture works as follows:

Payment systems publish each transaction as a JSON event to a Kafka topic called transactions_raw. A Structured Streaming job reads from this topic continuously. For each transaction, it enriches the event with customer profile features retrieved from a Delta table (account age, historical average transaction amount, typical transaction geography). It calculates derived features — how far this transaction's amount deviates from the customer's 30-day average, whether the merchant category matches the customer's typical spending. It applies a pre-trained fraud detection model (loaded from the MLflow Model Registry) to compute a fraud risk score. It writes the enriched, scored transaction to two sinks: a Delta table for audit logging and downstream analysis, and a Kafka topic transaction_scores that the authorization system reads to make the approve/decline decision.

The checkpoint location ensures that a cluster restart does not cause any transaction to be processed twice or missed entirely. Watermarks handle the occasional late-arriving event from mobile payment terminals with intermittent connectivity.

Monitoring Streaming Queries

Databricks provides built-in monitoring for streaming queries. The Structured Streaming UI (accessible through the Spark UI) shows for each active streaming query:

  • Input rate — How many events per second the query is processing
  • Processing rate — How fast the query is processing events
  • Batch duration — How long each micro-batch takes to complete
  • State size — How much memory the stateful operations are using

When input rate consistently exceeds processing rate, the query falls behind. The lag between when events occur and when they are processed grows over time. This is the streaming equivalent of a checkout queue growing longer because the cashier cannot scan items as fast as customers arrive. The solution is adding more compute resources to the cluster or optimizing the query.

Structured Streaming vs. Spark Batch: A Comparison

Understanding when to use streaming versus batch processing guides architecture decisions:

  • Use batch processing when results needed in hours are acceptable, the dataset has a known end, and compute cost efficiency matters more than latency.
  • Use streaming when results must be available within seconds or minutes, the data source produces data continuously without a defined end, and business decisions depend on the freshest possible data.
  • Use both together — This is the most common production pattern. Streaming handles fresh data ingestion and real-time aggregations. Batch processes produce historical reports, train machine learning models, and backfill historical data. Databricks makes combining both straightforward because the same Delta tables serve as sinks for streaming jobs and sources for batch queries.

Key Points Summary

  • Structured Streaming treats data streams as infinite tables that grow continuously, using the same Spark DataFrame API as batch processing.
  • Sources include Kafka, Delta Lake, cloud storage, Auto Loader, AWS Kinesis, and Azure Event Hubs.
  • Sinks include Delta Lake (most common), Kafka, cloud storage, console, and memory.
  • Three output modes — append, complete, and update — control what portion of results gets written each time.
  • Checkpointing saves processing state to cloud storage, enabling fault-tolerant recovery from failures without data loss or duplication.
  • Window aggregations compute metrics over sliding time windows rather than all historical data.
  • Watermarks define the maximum acceptable data lateness, allowing the system to finalize window results and free state memory.
  • Auto Loader efficiently detects and ingests new files from cloud storage at scale using cloud-native notifications.
  • Delta Live Tables provides a declarative, managed streaming pipeline framework on top of Structured Streaming.

Leave a Comment