ADE Databricks
As your Databricks skills grow, two advanced capabilities expand what you can build significantly. Delta Live Tables (DLT) automates pipeline orchestration and data quality enforcement within Databricks. Structured Streaming extends the DataFrame API to process real-time data streams using the same code patterns you already know from batch processing.
Delta Live Tables — Declarative Pipeline Development
Traditionally, building a multi-step Databricks pipeline means writing notebooks for each transformation, creating jobs that chain them together, handling retry logic, and writing custom data quality checks. Delta Live Tables replaces all of this with a single declarative framework.
Instead of telling Databricks how to run your pipeline step by step, DLT lets you declare what each table should contain. Databricks figures out the execution order, manages dependencies, handles retries, and tracks data quality automatically.
Think of DLT as giving instructions to a robot chef: "I want the final dish to contain cleaned vegetables, cooked protein, and reduced sauce." The robot determines the order to prepare each component, handles timing, and checks quality. You do not write step-by-step cooking instructions.
Defining DLT Tables
import dlt
from pyspark.sql.functions import col, to_date
# Bronze layer — raw ingestion
@dlt.table(
name="bronze_sales",
comment="Raw sales data loaded from ADLS Gen2"
)
def bronze_sales():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.load("abfss://bronze@mystorage.dfs.core.windows.net/sales/")
)
# Silver layer — cleaned data with quality rules
@dlt.table(
name="silver_sales",
comment="Cleaned and validated sales data"
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "order_date IS NOT NULL")
def silver_sales():
return (
dlt.read_stream("bronze_sales")
.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
.withColumn("amount", col("amount").cast("double"))
.drop("_rescued_data")
)
# Gold layer — business aggregation
@dlt.table(name="gold_sales_by_region")
def gold_sales_by_region():
return (
dlt.read("silver_sales")
.groupBy("region")
.agg({"amount": "sum", "order_id": "count"})
.withColumnRenamed("sum(amount)", "total_revenue")
.withColumnRenamed("count(order_id)", "order_count")
)
Data Quality with Expectations
The @dlt.expect decorators define data quality rules directly on the table definition. Three actions are available when a rule is violated:
@dlt.expect— Record the violation in quality metrics but keep the row@dlt.expect_or_drop— Drop the violating row from the output@dlt.expect_or_fail— Fail the entire pipeline run if any row violates the rule
DLT tracks quality metrics for every expectation — the number of records that passed and failed each rule over time. This creates a built-in data quality audit trail with zero additional engineering effort.
Continuous vs Triggered Pipelines
DLT pipelines run in two modes:
- Triggered: The pipeline runs once, processes all available new data, and stops. Best for batch workloads that run on a schedule.
- Continuous: The pipeline runs permanently, processing data as it arrives with low latency. Best for near-real-time use cases.
Structured Streaming — Real-Time Processing in Spark
Structured Streaming is the Spark framework for processing continuous data streams. It uses the same DataFrame API as batch processing. The key difference is that a streaming DataFrame represents an infinite, continuously growing table — new rows arrive and are processed as they come.
Reading from a Stream
# Read a stream from Event Hubs
connection_string = dbutils.secrets.get("keyvault-scope", "eventhubs-connection-string")
df_stream = (spark.readStream
.format("eventhubs")
.options(**{"eventhubs.connectionString": connection_string})
.load()
)
# The body column is binary — decode to string
from pyspark.sql.functions import col
df_events = df_stream.select(col("body").cast("string").alias("event_json"))
Reading from Auto Loader — Files as a Stream
Auto Loader is Databricks' optimized way to incrementally ingest files from ADLS Gen2. It detects new files as they land and processes only those files — without scanning the entire directory every time.
# Auto Loader — incrementally process new files as they arrive
df_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "abfss://checkpoints@mystorage.dfs.core.windows.net/sales_schema/")
.load("abfss://bronze@mystorage.dfs.core.windows.net/sales/")
)
Output Modes
Structured Streaming writes results in one of three output modes:
- Append: Only newly processed rows are written to the output. Used for immutable event data.
- Complete: The entire result is overwritten every time. Used for aggregations — when the total count changes, the whole result table is rewritten.
- Update: Only rows that changed since the last write are written. Supported only for aggregations with Delta Lake sinks.
Checkpointing — Fault Tolerance in Streams
A checkpoint stores the state of a streaming query — which data has been processed, the current watermark, and aggregation state. If the stream fails and restarts, it resumes from the checkpoint rather than reprocessing all data from the beginning.
# Write stream with checkpointing
(df_transformed.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "abfss://checkpoints@mystorage.dfs.core.windows.net/sales_stream/")
.trigger(processingTime="1 minute")
.start("abfss://silver@mystorage.dfs.core.windows.net/sales_stream/")
)
Watermarks — Handling Late-Arriving Data
In a real-time stream, events sometimes arrive late. A mobile app might generate a click event at 10:00 AM but network delays deliver it at 10:15 AM. A watermark tells Spark how long to wait for late events before closing a time window.
from pyspark.sql.functions import window
# Allow up to 10 minutes of late arrivals
df_windowed = (df_events
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"region"
)
.count()
)
Key Points
- Delta Live Tables replaces manual pipeline chaining with a declarative framework — define what data should contain, not how to compute it
- DLT expectations enforce data quality rules automatically and track pass/fail metrics over time
- Use Auto Loader to incrementally ingest files from ADLS Gen2 without full directory scans
- Always configure a checkpointLocation for every streaming query — it enables fault-tolerant restarts
- Use watermarks to define how long Spark waits for late-arriving events before finalizing time window aggregations
- Choose Triggered mode for scheduled batch DLT pipelines; Continuous mode for low-latency near-real-time processing
