Databricks ETL Pipelines

ETL stands for Extract, Transform, Load. It is the most fundamental pattern in data engineering. You extract raw data from its source, transform it by cleaning, reshaping, and enriching it, and then load it into a destination where analysts and applications can use it. In Databricks, ETL pipelines are built using Apache Spark code in notebooks, and the industry standard for organizing them is the Medallion Architecture: Bronze, Silver, and Gold layers.

Think of the Medallion Architecture like refining crude oil. Crude oil (Bronze layer) is extracted raw and unprocessed. It goes through a refinery (Silver layer) where impurities are removed and the oil becomes usable. The final products — gasoline, jet fuel, lubricants (Gold layer) — are purpose-built for specific consumers.

The Medallion Architecture

MEDALLION ARCHITECTURE OVERVIEW
──────────────────────────────────────────────────────────────
SOURCE SYSTEMS
  CRM (Salesforce)
  ERP (SAP)
  E-commerce platform
  IoT sensors
         │
         │ Raw data arrives
         ▼
┌─────────────────────────────────────────────────────────┐
│ BRONZE LAYER (Raw)                                      │
│ • Data loaded exactly as received — no transformations  │
│ • All columns preserved, including bad/null data        │
│ • Ingestion metadata added (timestamp, source file)     │
│ • Never deleted — permanent archive of raw data         │
│ Example tables:                                         │
│   bronze.crm_contacts_raw                               │
│   bronze.orders_raw                                     │
│   bronze.iot_events_raw                                 │
└────────────────────────┬────────────────────────────────┘
                         │ Clean and standardize
                         ▼
┌─────────────────────────────────────────────────────────┐
│ SILVER LAYER (Cleaned)                                  │
│ • Null values handled (removed or filled)               │
│ • Data types corrected (strings to dates, etc.)         │
│ • Duplicates removed                                    │
│ • Columns renamed to consistent naming conventions      │
│ • Business rules applied (e.g., filter invalid amounts) │
│ • Tables joined (customers merged with orders)          │
│ Example tables:                                         │
│   silver.customers                                      │
│   silver.orders                                         │
│   silver.order_items                                    │
└────────────────────────┬────────────────────────────────┘
                         │ Aggregate and enrich
                         ▼
┌─────────────────────────────────────────────────────────┐
│ GOLD LAYER (Business-Ready)                             │
│ • Pre-aggregated for specific business questions        │
│ • Optimized for fast query performance                  │
│ • Named for business concepts, not technical ones       │
│ • Consumed by dashboards, reports, ML models            │
│ Example tables:                                         │
│   gold.monthly_revenue_by_region                        │
│   gold.customer_lifetime_value                          │
│   gold.product_performance_dashboard                    │
└─────────────────────────────────────────────────────────┘

Building the Bronze Layer

The Bronze layer ingests raw data with minimal processing. The goal is speed and completeness — capture everything, change nothing. This preserves the original data for debugging, auditing, and future reprocessing if business rules change.

BRONZE LAYER NOTEBOOK: ingest_orders_to_bronze.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import current_timestamp, input_file_name, lit

# Read raw CSV from cloud storage
df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/mnt/source/orders/2024/*.csv")

# Add metadata columns (do NOT transform any source columns)
df_bronze = df_raw \
    .withColumn("_ingestion_timestamp", current_timestamp()) \
    .withColumn("_source_file", input_file_name()) \
    .withColumn("_data_source", lit("erp_system"))

# Write to Bronze Delta table
df_bronze.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("_data_source") \
    .saveAsTable("bronze.orders_raw")

print(f"Loaded {df_bronze.count()} rows to bronze.orders_raw")

Notice that the Bronze layer code adds three metadata columns (prefixed with underscore) and does nothing else. Even column names with spaces, wrong data types, or misspellings go into Bronze exactly as they arrived. This makes Bronze a perfect audit trail.

Building the Silver Layer

The Silver layer transforms raw Bronze data into clean, trustworthy data. This is where most data engineering complexity lives: fixing data quality issues, standardizing formats, deduplicating, and joining tables.

SILVER LAYER NOTEBOOK: bronze_to_silver_orders.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import (
    col, to_timestamp, upper, trim, regexp_replace,
    when, coalesce, lit, current_timestamp, row_number
)
from pyspark.sql.window import Window

# Read from Bronze
df_bronze = spark.table("bronze.orders_raw")

# STEP 1: Fix data types
df_typed = df_bronze \
    .withColumn("order_id",
                col("OrderID").cast("integer")) \
    .withColumn("customer_id",
                col("CustomerID").cast("integer")) \
    .withColumn("order_amount",
                col("Amount").cast("double")) \
    .withColumn("order_date",
                to_timestamp(col("OrderDate"),
                             "dd/MM/yyyy HH:mm:ss"))

# STEP 2: Clean string columns
df_cleaned = df_typed \
    .withColumn("city",
                upper(trim(col("City")))) \
    .withColumn("product_name",
                trim(regexp_replace(col("ProductName"),
                                    "[^a-zA-Z0-9 ]", "")))

# STEP 3: Filter invalid records
df_valid = df_cleaned \
    .filter(col("order_id").isNotNull()) \
    .filter(col("order_amount") > 0) \
    .filter(col("order_date").isNotNull()) \
    .filter(col("order_date") <= current_timestamp())

# STEP 4: Remove duplicates (keep latest record per order_id)
window_spec = Window \
    .partitionBy("order_id") \
    .orderBy(col("_ingestion_timestamp").desc())

df_deduped = df_valid \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# STEP 5: Select and rename final columns
df_silver = df_deduped.select(
    col("order_id"),
    col("customer_id"),
    col("product_name"),
    col("order_amount"),
    col("city"),
    col("order_date"),
    col("_ingestion_timestamp").alias("bronze_ingestion_ts"),
    current_timestamp().alias("silver_processed_ts")
)

# STEP 6: Write to Silver Delta table using MERGE
# (update existing, insert new)
from delta.tables import DeltaTable

if spark.catalog.tableExists("silver.orders"):
    silver_table = DeltaTable.forName(spark, "silver.orders")
    silver_table.alias("target").merge(
        df_silver.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    df_silver.write \
        .format("delta") \
        .saveAsTable("silver.orders")

print(f"Silver orders table updated: {df_silver.count()} records processed")

Building the Gold Layer

The Gold layer aggregates and enriches Silver data for specific business use cases. Each Gold table answers a specific business question and is optimized for the consumption pattern (dashboard queries, ML features, executive reports).

GOLD LAYER NOTEBOOK: silver_to_gold_monthly_revenue.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import (
    date_trunc, sum, count, countDistinct, avg,
    round, current_timestamp, lag, col
)
from pyspark.sql.window import Window

# Read cleaned Silver tables
orders = spark.table("silver.orders")
customers = spark.table("silver.customers")
products = spark.table("silver.products")

# Join to enrich data
df_enriched = orders \
    .join(customers, "customer_id", "left") \
    .join(products, "product_id", "left") \
    .select(
        "order_id",
        "order_date",
        "order_amount",
        "city",
        "region",
        "category",
        "customer_tier"
    )

# Aggregate to monthly level
df_monthly = df_enriched \
    .withColumn("month",
                date_trunc("month", col("order_date"))) \
    .groupBy("month", "region", "category", "customer_tier") \
    .agg(
        count("order_id").alias("num_orders"),
        countDistinct("customer_id").alias("unique_customers"),
        round(sum("order_amount"), 2).alias("total_revenue"),
        round(avg("order_amount"), 2).alias("avg_order_value")
    )

# Add month-over-month growth rate
window_mom = Window \
    .partitionBy("region", "category") \
    .orderBy("month")

df_gold = df_monthly \
    .withColumn("prev_month_revenue",
                lag("total_revenue", 1).over(window_mom)) \
    .withColumn("mom_growth_pct",
        round(
            (col("total_revenue") - col("prev_month_revenue"))
            / col("prev_month_revenue") * 100,
        2)) \
    .withColumn("last_updated", current_timestamp())

# Write to Gold Delta table
df_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.monthly_revenue_by_region_category")

print("Gold table refreshed successfully")

Data Quality Checks – Protecting Your Pipeline

A pipeline that silently produces wrong results is worse than a pipeline that fails loudly. Build data quality checks at each layer transition.

DATA QUALITY FRAMEWORK
──────────────────────────────────────────────────────────────

def check_data_quality(df, table_name, checks):
    """
    Run quality checks on a DataFrame.
    Raise exception if any check fails.
    """
    failures = []

    for check_name, condition, description in checks:
        failing_count = df.filter(~condition).count()
        if failing_count > 0:
            failures.append(
                f"FAIL [{check_name}]: {failing_count} rows "
                f"violate '{description}'"
            )

    if failures:
        raise ValueError(
            f"Data quality failed for {table_name}:\n" +
            "\n".join(failures)
        )
    else:
        print(f"All quality checks passed for {table_name}")

# USAGE EXAMPLE:
from pyspark.sql.functions import col, current_timestamp

quality_checks = [
    ("null_order_id",
     col("order_id").isNotNull(),
     "order_id must not be null"),

    ("positive_amount",
     col("order_amount") > 0,
     "order_amount must be greater than zero"),

    ("valid_date",
     col("order_date") <= current_timestamp(),
     "order_date cannot be in the future"),

    ("valid_city",
     col("city").isin(["MUMBAI","DELHI","BENGALURU",
                        "PUNE","CHENNAI","HYDERABAD"]),
     "city must be a known Indian metro")
]

check_data_quality(df_silver, "silver.orders", quality_checks)

Handling Schema Changes Gracefully

Source systems change. New columns appear. Old columns disappear. Column data types change. Your pipeline must handle these changes without breaking.

SCHEMA CHANGE SCENARIOS AND RESPONSES
──────────────────────────────────────────────────────────────
Scenario 1: Source adds a new column "loyalty_points"
  → Use mergeSchema=True when writing to Bronze
  → Silver notebook selects explicit columns, ignores new ones
     until business team decides if it's needed

Scenario 2: Source renames "OrderID" to "order_id"
  → Bronze stores both (old rows have OrderID, new rows have order_id)
  → Silver uses COALESCE:
     col("order_id_new") = coalesce(col("order_id"),
                                     col("OrderID"))

Scenario 3: Source changes amount from INTEGER to DOUBLE
  → Bronze records the raw string value
  → Silver casts to DOUBLE — handles both old integer
     and new decimal values seamlessly

Scenario 4: Source removes a previously required column
  → Bronze captures what arrives (missing column → all nulls)
  → Silver detects high null rate via quality check
  → Alerts data engineering team to investigate

Incremental Processing with Watermarks

Processing all data from scratch every run is inefficient. Incremental processing reads only records that changed since the last run. Use a high-water mark column (typically a timestamp or an auto-incrementing ID) to track progress.

INCREMENTAL LOAD PATTERN
──────────────────────────────────────────────────────────────
# Read last processed timestamp from a control table
last_run = spark.sql("""
    SELECT COALESCE(MAX(last_processed_ts), '1900-01-01')
    AS watermark
    FROM pipeline_control.etl_checkpoints
    WHERE table_name = 'silver.orders'
""").first()["watermark"]

# Read only new/changed records from Bronze
df_incremental = spark.table("bronze.orders_raw") \
    .filter(col("_ingestion_timestamp") > last_run)

record_count = df_incremental.count()
print(f"Processing {record_count} new records since {last_run}")

# Process and write to Silver (using MERGE to handle updates)
# ... transformation code ...

# Update control table watermark
spark.sql(f"""
    MERGE INTO pipeline_control.etl_checkpoints AS target
    USING (SELECT
        'silver.orders' AS table_name,
        current_timestamp() AS last_processed_ts
    ) AS source
    ON target.table_name = source.table_name
    WHEN MATCHED THEN UPDATE SET
        target.last_processed_ts = source.last_processed_ts
    WHEN NOT MATCHED THEN INSERT *
""")
print("Watermark updated")

Key Points

  • ETL (Extract, Transform, Load) is the core pattern for moving and preparing data in Databricks.
  • The Medallion Architecture organizes data into three layers: Bronze (raw), Silver (clean), and Gold (business-ready).
  • Bronze stores data exactly as received — no transformations — making it a permanent audit trail.
  • Silver applies cleaning, type corrections, deduplication, and joining to produce trustworthy data assets.
  • Gold aggregates Silver data for specific business questions, optimized for dashboard queries and reporting.
  • Data quality checks at each layer boundary catch problems early and prevent bad data from reaching consumers.
  • Incremental processing uses watermark timestamps to read only new data, dramatically reducing compute costs compared to full re-processing.
  • MERGE (upsert) operations at the Silver and Gold layers handle both new records and updates to existing records in one atomic operation.

Leave a Comment