Databricks Delta Live Tables

Delta Live Tables (DLT) is a declarative framework for building data pipelines in Databricks. The word "declarative" is important: instead of writing step-by-step code that tells Databricks exactly how to run your pipeline, you declare what the output tables should contain and DLT figures out the how — the execution order, dependency management, error handling, retries, and monitoring — automatically.

Before DLT existed, building a reliable production pipeline required writing a lot of orchestration code: check if upstream tables are ready, retry failed steps, handle partial failures, track lineage, set up monitoring. With DLT, you write only the transformation logic. The framework handles everything else.

Think of DLT like ordering food at a restaurant. You declare what you want — "grilled paneer with rice and salad" — and the kitchen figures out the order of cooking, which chef does what, and how to bring it together. You do not manage the kitchen operations. DLT is the kitchen; your transformation code is the order.

The Three Core DLT Concepts

DELTA LIVE TABLES BUILDING BLOCKS
──────────────────────────────────────────────────────────────
1. STREAMING TABLES (@dlt.table with streaming=True or AUTO)
   • Incremental processing of new data
   • Each run processes only new records
   • Best for append-only data sources (logs, events, IoT)

2. MATERIALIZED VIEWS (@dlt.table without streaming)
   • Full refresh on each pipeline run
   • Always reflects current state of source tables
   • Best for aggregations and joins that need complete data

3. VIEWS (@dlt.view)
   • Intermediate transformations (not stored to disk)
   • Reused within the same pipeline
   • Best for breaking complex logic into named steps

Writing Your First DLT Pipeline

A DLT pipeline lives in one or more notebooks. Each table is defined as a Python function decorated with @dlt.table. The function returns a DataFrame that becomes the table's content. DLT reads the function name as the table name.

DLT PIPELINE NOTEBOOK: retail_pipeline.py
──────────────────────────────────────────────────────────────
import dlt
from pyspark.sql.functions import (
    col, upper, trim, to_timestamp, current_timestamp,
    when, coalesce, sum, count, round, date_trunc
)

# ─── BRONZE LAYER ─────────────────────────────────────────

@dlt.table(
    name="orders_raw",
    comment="Raw orders loaded from CSV files in cloud storage.",
    table_properties={"quality": "bronze"}
)
def orders_raw():
    return (
        spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format", "csv")
             .option("cloudFiles.schemaLocation",
                     "/mnt/schemas/orders")
             .option("header", "true")
             .load("/mnt/incoming/orders/")
    )

# ─── SILVER LAYER ─────────────────────────────────────────

@dlt.table(
    name="orders_cleaned",
    comment="Cleaned and validated orders with correct data types.",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_order_id",
                    "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount",
                    "order_amount > 0")
@dlt.expect("valid_date",
            "order_date <= current_timestamp()")
def orders_cleaned():
    return (
        dlt.read_stream("orders_raw")
           .withColumn("order_id",   col("OrderID").cast("integer"))
           .withColumn("order_amount", col("Amount").cast("double"))
           .withColumn("order_date",
                       to_timestamp(col("OrderDate"), "dd/MM/yyyy"))
           .withColumn("city", upper(trim(col("City"))))
           .select("order_id", "order_amount",
                   "order_date", "city", "customer_id")
    )

# ─── GOLD LAYER ───────────────────────────────────────────

@dlt.table(
    name="monthly_city_revenue",
    comment="Monthly total revenue and order count grouped by city.",
    table_properties={"quality": "gold"}
)
def monthly_city_revenue():
    return (
        dlt.read("orders_cleaned")
           .withColumn("month", date_trunc("month", col("order_date")))
           .groupBy("month", "city")
           .agg(
               count("order_id").alias("total_orders"),
               round(sum("order_amount"), 2).alias("total_revenue")
           )
    )

Data Quality with Expectations

DLT Expectations are data quality rules declared directly on your tables using the @dlt.expect decorator. They replace the manual quality check functions you would write in a traditional pipeline. DLT monitors expectations continuously and gives you a real-time quality dashboard.

DLT EXPECTATION BEHAVIORS
──────────────────────────────────────────────────────────────
@dlt.expect("rule_name", "condition")
  → WARN: Records that fail the condition are still loaded,
    but the violation is counted and shown in the dashboard.
    Use when bad records are acceptable but worth monitoring.

@dlt.expect_or_drop("rule_name", "condition")
  → DROP: Records that fail the condition are removed.
    The pipeline continues with only valid records.
    Use to filter out obviously bad data.

@dlt.expect_or_fail("rule_name", "condition")
  → FAIL: If ANY record fails the condition, the entire
    pipeline stops with an error.
    Use for critical business rules where even one bad
    record is unacceptable (e.g., financial transactions
    must always have a positive amount).

Multiple Expectations on One Table

@dlt.table(name="payments_validated")
@dlt.expect_or_fail("no_null_payment_id",
                    "payment_id IS NOT NULL")
@dlt.expect_or_fail("positive_payment",
                    "payment_amount > 0")
@dlt.expect_or_drop("valid_currency",
                    "currency IN ('INR', 'USD', 'EUR')")
@dlt.expect("future_date_warning",
            "payment_date <= current_date()")
def payments_validated():
    return dlt.read_stream("payments_raw")
HOW DLT PROCESSES THESE EXPECTATIONS:
──────────────────────────────────────────────────────────────
Incoming row: payment_id=NULL, amount=500, currency="INR"
→ FAILS "no_null_payment_id" (expect_or_fail)
→ PIPELINE STOPS — this critical rule is violated

Incoming row: payment_id=1001, amount=300, currency="GBP"
→ PASSES payment_id check ✓
→ PASSES positive_amount ✓
→ FAILS currency check (GBP not in list) → ROW DROPPED
→ Row removed silently, counter incremented in dashboard

Incoming row: payment_id=1002, amount=700, currency="USD",
              payment_date="2025-12-31"
→ All hard rules pass ✓
→ Future date WARNING fires → row included, warning logged

The DLT Pipeline Graph – Visualizing Dependencies

DLT automatically builds a dependency graph from your code. Because each table function uses dlt.read() or dlt.read_stream() to reference other tables, DLT knows the exact order in which tables must be created.

AUTOMATICALLY GENERATED PIPELINE GRAPH
──────────────────────────────────────────────────────────────
Cloud Storage (Source)
        │
        │ readStream (cloudFiles)
        ▼
  [orders_raw]             [customers_raw]
   Bronze table             Bronze table
        │                        │
        │ dlt.read_stream()      │ dlt.read_stream()
        ▼                        ▼
  [orders_cleaned]        [customers_cleaned]
   Silver table            Silver table
        │                        │
        └────────────┬───────────┘
                     │ join on customer_id
                     ▼
              [enriched_orders]
               Silver table (joined)
                     │
                     │ dlt.read()
                     ▼
        [monthly_city_revenue]   [customer_lifetime_value]
         Gold table               Gold table

DLT reads this graph and executes tables in the correct order:
Bronze first → Silver → Gold
No manual orchestration needed.

Creating DLT Pipelines in the Workspace

To create a DLT pipeline in the Databricks UI, go to Workflows > Delta Live Tables and click Create Pipeline. Fill in these settings:

DLT PIPELINE CONFIGURATION
──────────────────────────────────────────────────────────────
Name:         retail_data_pipeline
Product Edition: Core / Pro / Advanced
                 (Advanced includes enhanced data quality
                  features and real-time monitoring)

Source Code:  Select the notebook(s) containing
              your @dlt.table functions

Target Schema: main.retail_pipeline
               (All created tables land here)

Storage Location: /mnt/dlt/retail_pipeline
                  (DLT stores event logs and checkpoints)

Cluster Policy: Select appropriate policy
                (DLT manages its own cluster automatically)

Pipeline Mode:
  Triggered  → Runs once when manually started or scheduled
               Best for batch pipelines
  Continuous → Runs permanently, processing new data as it arrives
               Best for near-real-time pipelines

Running and Monitoring a DLT Pipeline

After creating the pipeline, click Start. DLT shows a live visual graph where each table turns green as it successfully completes, yellow if there are expectation warnings, or red if it fails.

DLT PIPELINE MONITORING UI
──────────────────────────────────────────────────────────────
┌──────────────────────────────────────────────────────────┐
│ PIPELINE: retail_data_pipeline         Status: RUNNING   │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  [orders_raw] ──────────→ [orders_cleaned] ──→           │
│   ✅ Complete               ✅ Complete                 │
│   1,234,567 rows            1,230,102 rows               │
│   (0 warnings)              (4,465 rows dropped)         │
│                                  │                       │
│  [customers_raw] ───────→ [customers_cleaned]            │
│   ✅ Complete               ✅ Complete                 │
│   485,230 rows              485,218 rows                 │
│                                  │                       │
│                                  ↓                       │
│                        [monthly_city_revenue]            │
│                         ✅ Complete                      │
│                         2,184 rows                       │
│                                                          │
│ DATA QUALITY SUMMARY:                                    │
│ Expectation           │ Pass    │ Fail   │ Action        │
│ valid_order_id        │ 1,234,567│ 0     │ or_fail       │
│ positive_amount       │ 1,234,567│ 0     │ or_fail       │
│ valid_city            │ 1,230,102│ 4,465 │ or_drop       │
└──────────────────────────────────────────────────────────┘

The Event Log section below the pipeline graph shows a timestamped history of every event: pipeline started, table started, expectation results, table completed, pipeline completed. This log is itself stored as a Delta table, so you can query it with SQL to build custom monitoring dashboards.

DLT Pipeline Modes: Triggered vs Continuous

Triggered Mode

Triggered mode runs the pipeline once, processes all available data, and then stops the cluster. Use this for batch pipelines that run on a schedule — nightly at midnight, hourly at the top of the hour, or on demand. It is cost-effective because the cluster shuts down when the pipeline finishes.

TRIGGERED MODE TIMELINE
──────────────────────────────────────────────────────────────
Midnight: Workflow triggers DLT pipeline
  00:00 → Cluster starts (2 min)
  00:02 → Bronze tables processed
  00:08 → Silver tables processed
  00:15 → Gold tables processed
  00:15 → Cluster terminates automatically
  
Cost: Pay for 15 minutes of compute.

Continuous Mode

Continuous mode keeps the cluster running permanently and processes new data the moment it arrives. Use this when your business needs near-real-time data — fraud detection, live inventory tracking, real-time logistics updates. The cluster runs 24/7 and costs accordingly.

CONTINUOUS MODE TIMELINE
──────────────────────────────────────────────────────────────
9:00 AM: New order files land in cloud storage
  → Auto Loader detects new files within seconds
  → Bronze table updated: +500 rows
  → Silver table updated: +498 rows (2 dropped by expectations)
  → Gold table updated: recalculated for current month

9:05 AM: Next batch of files arrives
  → Same process repeats automatically

Cost: Pay for 24/7 compute on the pipeline cluster.
Benefit: Data is always 5–30 seconds fresh.

Advanced DLT Features

Change Data Capture (CDC) with APPLY CHANGES INTO

Many source systems publish only the changes (inserts, updates, deletes) rather than the full table on every extract. DLT's APPLY CHANGES INTO statement processes these change events and automatically applies them to a Delta table, maintaining the correct current state.

-- CDC stream: Each row is a change event with operation type
-- Source: customer_changes_stream
-- Columns: customer_id, name, city, phone, operation, sequence_num

dlt.create_streaming_table("customers_current")

dlt.apply_changes(
    target = "customers_current",
    source = "customer_changes_stream",
    keys = ["customer_id"],           # ← The unique identifier
    sequence_by = "sequence_num",     # ← Order of operations
    apply_as_deletes = expr("operation = 'DELETE'"),
    except_column_list = ["operation", "sequence_num"]
)
CDC PROCESSING EXAMPLE:
──────────────────────────────────────────────────────────────
Incoming change events:
  {customer_id: 101, name: "Priya", city: "Mumbai", op: "INSERT", seq: 1}
  {customer_id: 101, name: "Priya", city: "Pune",   op: "UPDATE", seq: 2}
  {customer_id: 102, name: "Rahul", city: "Delhi",  op: "INSERT", seq: 3}
  {customer_id: 101, op: "DELETE", seq: 4}

Resulting customers_current table:
  customer_id=101 → DELETED (seq 4 is the latest operation)
  customer_id=102 → name: Rahul, city: Delhi

Pipeline Parameters

DLT supports pipeline-level parameters that let you reuse the same pipeline code for different environments (development, staging, production) by passing different values at runtime.

# Access pipeline parameter in your notebook:
import dlt
env = spark.conf.get("pipeline.environment", "dev")

target_catalog = "main" if env == "prod" else f"dev_{env}"

@dlt.table(name=f"{target_catalog}.orders_raw")
def orders_raw():
    ...

Key Points

  • Delta Live Tables is a declarative framework — you define what tables should contain, and DLT manages execution order, retries, and monitoring automatically.
  • Tables are defined as Python functions decorated with @dlt.table, using dlt.read() and dlt.read_stream() to reference dependencies.
  • DLT Expectations enforce data quality rules at three levels: warn only, drop failing rows, or fail the entire pipeline.
  • DLT automatically builds a dependency graph and executes tables in the correct order without manual orchestration code.
  • Triggered mode runs once and shuts down (batch use case); Continuous mode runs permanently processing real-time data.
  • APPLY CHANGES INTO handles Change Data Capture (CDC) scenarios, automatically applying inserts, updates, and deletes from a change stream to a target table.
  • The DLT monitoring UI shows real-time pipeline progress, per-table row counts, and data quality expectation results in a visual graph.

Leave a Comment