Databricks ETL Pipelines
ETL stands for Extract, Transform, Load. It is the most fundamental pattern in data engineering. You extract raw data from its source, transform it by cleaning, reshaping, and enriching it, and then load it into a destination where analysts and applications can use it. In Databricks, ETL pipelines are built using Apache Spark code in notebooks, and the industry standard for organizing them is the Medallion Architecture: Bronze, Silver, and Gold layers.
Think of the Medallion Architecture like refining crude oil. Crude oil (Bronze layer) is extracted raw and unprocessed. It goes through a refinery (Silver layer) where impurities are removed and the oil becomes usable. The final products — gasoline, jet fuel, lubricants (Gold layer) — are purpose-built for specific consumers.
The Medallion Architecture
MEDALLION ARCHITECTURE OVERVIEW
──────────────────────────────────────────────────────────────
SOURCE SYSTEMS
CRM (Salesforce)
ERP (SAP)
E-commerce platform
IoT sensors
│
│ Raw data arrives
▼
┌─────────────────────────────────────────────────────────┐
│ BRONZE LAYER (Raw) │
│ • Data loaded exactly as received — no transformations │
│ • All columns preserved, including bad/null data │
│ • Ingestion metadata added (timestamp, source file) │
│ • Never deleted — permanent archive of raw data │
│ Example tables: │
│ bronze.crm_contacts_raw │
│ bronze.orders_raw │
│ bronze.iot_events_raw │
└────────────────────────┬────────────────────────────────┘
│ Clean and standardize
▼
┌─────────────────────────────────────────────────────────┐
│ SILVER LAYER (Cleaned) │
│ • Null values handled (removed or filled) │
│ • Data types corrected (strings to dates, etc.) │
│ • Duplicates removed │
│ • Columns renamed to consistent naming conventions │
│ • Business rules applied (e.g., filter invalid amounts) │
│ • Tables joined (customers merged with orders) │
│ Example tables: │
│ silver.customers │
│ silver.orders │
│ silver.order_items │
└────────────────────────┬────────────────────────────────┘
│ Aggregate and enrich
▼
┌─────────────────────────────────────────────────────────┐
│ GOLD LAYER (Business-Ready) │
│ • Pre-aggregated for specific business questions │
│ • Optimized for fast query performance │
│ • Named for business concepts, not technical ones │
│ • Consumed by dashboards, reports, ML models │
│ Example tables: │
│ gold.monthly_revenue_by_region │
│ gold.customer_lifetime_value │
│ gold.product_performance_dashboard │
└─────────────────────────────────────────────────────────┘
Building the Bronze Layer
The Bronze layer ingests raw data with minimal processing. The goal is speed and completeness — capture everything, change nothing. This preserves the original data for debugging, auditing, and future reprocessing if business rules change.
BRONZE LAYER NOTEBOOK: ingest_orders_to_bronze.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import current_timestamp, input_file_name, lit
# Read raw CSV from cloud storage
df_raw = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/mnt/source/orders/2024/*.csv")
# Add metadata columns (do NOT transform any source columns)
df_bronze = df_raw \
.withColumn("_ingestion_timestamp", current_timestamp()) \
.withColumn("_source_file", input_file_name()) \
.withColumn("_data_source", lit("erp_system"))
# Write to Bronze Delta table
df_bronze.write \
.format("delta") \
.mode("append") \
.partitionBy("_data_source") \
.saveAsTable("bronze.orders_raw")
print(f"Loaded {df_bronze.count()} rows to bronze.orders_raw")
Notice that the Bronze layer code adds three metadata columns (prefixed with underscore) and does nothing else. Even column names with spaces, wrong data types, or misspellings go into Bronze exactly as they arrived. This makes Bronze a perfect audit trail.
Building the Silver Layer
The Silver layer transforms raw Bronze data into clean, trustworthy data. This is where most data engineering complexity lives: fixing data quality issues, standardizing formats, deduplicating, and joining tables.
SILVER LAYER NOTEBOOK: bronze_to_silver_orders.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import (
col, to_timestamp, upper, trim, regexp_replace,
when, coalesce, lit, current_timestamp, row_number
)
from pyspark.sql.window import Window
# Read from Bronze
df_bronze = spark.table("bronze.orders_raw")
# STEP 1: Fix data types
df_typed = df_bronze \
.withColumn("order_id",
col("OrderID").cast("integer")) \
.withColumn("customer_id",
col("CustomerID").cast("integer")) \
.withColumn("order_amount",
col("Amount").cast("double")) \
.withColumn("order_date",
to_timestamp(col("OrderDate"),
"dd/MM/yyyy HH:mm:ss"))
# STEP 2: Clean string columns
df_cleaned = df_typed \
.withColumn("city",
upper(trim(col("City")))) \
.withColumn("product_name",
trim(regexp_replace(col("ProductName"),
"[^a-zA-Z0-9 ]", "")))
# STEP 3: Filter invalid records
df_valid = df_cleaned \
.filter(col("order_id").isNotNull()) \
.filter(col("order_amount") > 0) \
.filter(col("order_date").isNotNull()) \
.filter(col("order_date") <= current_timestamp())
# STEP 4: Remove duplicates (keep latest record per order_id)
window_spec = Window \
.partitionBy("order_id") \
.orderBy(col("_ingestion_timestamp").desc())
df_deduped = df_valid \
.withColumn("row_num", row_number().over(window_spec)) \
.filter(col("row_num") == 1) \
.drop("row_num")
# STEP 5: Select and rename final columns
df_silver = df_deduped.select(
col("order_id"),
col("customer_id"),
col("product_name"),
col("order_amount"),
col("city"),
col("order_date"),
col("_ingestion_timestamp").alias("bronze_ingestion_ts"),
current_timestamp().alias("silver_processed_ts")
)
# STEP 6: Write to Silver Delta table using MERGE
# (update existing, insert new)
from delta.tables import DeltaTable
if spark.catalog.tableExists("silver.orders"):
silver_table = DeltaTable.forName(spark, "silver.orders")
silver_table.alias("target").merge(
df_silver.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_silver.write \
.format("delta") \
.saveAsTable("silver.orders")
print(f"Silver orders table updated: {df_silver.count()} records processed")
Building the Gold Layer
The Gold layer aggregates and enriches Silver data for specific business use cases. Each Gold table answers a specific business question and is optimized for the consumption pattern (dashboard queries, ML features, executive reports).
GOLD LAYER NOTEBOOK: silver_to_gold_monthly_revenue.py
──────────────────────────────────────────────────────────────
from pyspark.sql.functions import (
date_trunc, sum, count, countDistinct, avg,
round, current_timestamp, lag, col
)
from pyspark.sql.window import Window
# Read cleaned Silver tables
orders = spark.table("silver.orders")
customers = spark.table("silver.customers")
products = spark.table("silver.products")
# Join to enrich data
df_enriched = orders \
.join(customers, "customer_id", "left") \
.join(products, "product_id", "left") \
.select(
"order_id",
"order_date",
"order_amount",
"city",
"region",
"category",
"customer_tier"
)
# Aggregate to monthly level
df_monthly = df_enriched \
.withColumn("month",
date_trunc("month", col("order_date"))) \
.groupBy("month", "region", "category", "customer_tier") \
.agg(
count("order_id").alias("num_orders"),
countDistinct("customer_id").alias("unique_customers"),
round(sum("order_amount"), 2).alias("total_revenue"),
round(avg("order_amount"), 2).alias("avg_order_value")
)
# Add month-over-month growth rate
window_mom = Window \
.partitionBy("region", "category") \
.orderBy("month")
df_gold = df_monthly \
.withColumn("prev_month_revenue",
lag("total_revenue", 1).over(window_mom)) \
.withColumn("mom_growth_pct",
round(
(col("total_revenue") - col("prev_month_revenue"))
/ col("prev_month_revenue") * 100,
2)) \
.withColumn("last_updated", current_timestamp())
# Write to Gold Delta table
df_gold.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.monthly_revenue_by_region_category")
print("Gold table refreshed successfully")
Data Quality Checks – Protecting Your Pipeline
A pipeline that silently produces wrong results is worse than a pipeline that fails loudly. Build data quality checks at each layer transition.
DATA QUALITY FRAMEWORK
──────────────────────────────────────────────────────────────
def check_data_quality(df, table_name, checks):
"""
Run quality checks on a DataFrame.
Raise exception if any check fails.
"""
failures = []
for check_name, condition, description in checks:
failing_count = df.filter(~condition).count()
if failing_count > 0:
failures.append(
f"FAIL [{check_name}]: {failing_count} rows "
f"violate '{description}'"
)
if failures:
raise ValueError(
f"Data quality failed for {table_name}:\n" +
"\n".join(failures)
)
else:
print(f"All quality checks passed for {table_name}")
# USAGE EXAMPLE:
from pyspark.sql.functions import col, current_timestamp
quality_checks = [
("null_order_id",
col("order_id").isNotNull(),
"order_id must not be null"),
("positive_amount",
col("order_amount") > 0,
"order_amount must be greater than zero"),
("valid_date",
col("order_date") <= current_timestamp(),
"order_date cannot be in the future"),
("valid_city",
col("city").isin(["MUMBAI","DELHI","BENGALURU",
"PUNE","CHENNAI","HYDERABAD"]),
"city must be a known Indian metro")
]
check_data_quality(df_silver, "silver.orders", quality_checks)
Handling Schema Changes Gracefully
Source systems change. New columns appear. Old columns disappear. Column data types change. Your pipeline must handle these changes without breaking.
SCHEMA CHANGE SCENARIOS AND RESPONSES
──────────────────────────────────────────────────────────────
Scenario 1: Source adds a new column "loyalty_points"
→ Use mergeSchema=True when writing to Bronze
→ Silver notebook selects explicit columns, ignores new ones
until business team decides if it's needed
Scenario 2: Source renames "OrderID" to "order_id"
→ Bronze stores both (old rows have OrderID, new rows have order_id)
→ Silver uses COALESCE:
col("order_id_new") = coalesce(col("order_id"),
col("OrderID"))
Scenario 3: Source changes amount from INTEGER to DOUBLE
→ Bronze records the raw string value
→ Silver casts to DOUBLE — handles both old integer
and new decimal values seamlessly
Scenario 4: Source removes a previously required column
→ Bronze captures what arrives (missing column → all nulls)
→ Silver detects high null rate via quality check
→ Alerts data engineering team to investigate
Incremental Processing with Watermarks
Processing all data from scratch every run is inefficient. Incremental processing reads only records that changed since the last run. Use a high-water mark column (typically a timestamp or an auto-incrementing ID) to track progress.
INCREMENTAL LOAD PATTERN
──────────────────────────────────────────────────────────────
# Read last processed timestamp from a control table
last_run = spark.sql("""
SELECT COALESCE(MAX(last_processed_ts), '1900-01-01')
AS watermark
FROM pipeline_control.etl_checkpoints
WHERE table_name = 'silver.orders'
""").first()["watermark"]
# Read only new/changed records from Bronze
df_incremental = spark.table("bronze.orders_raw") \
.filter(col("_ingestion_timestamp") > last_run)
record_count = df_incremental.count()
print(f"Processing {record_count} new records since {last_run}")
# Process and write to Silver (using MERGE to handle updates)
# ... transformation code ...
# Update control table watermark
spark.sql(f"""
MERGE INTO pipeline_control.etl_checkpoints AS target
USING (SELECT
'silver.orders' AS table_name,
current_timestamp() AS last_processed_ts
) AS source
ON target.table_name = source.table_name
WHEN MATCHED THEN UPDATE SET
target.last_processed_ts = source.last_processed_ts
WHEN NOT MATCHED THEN INSERT *
""")
print("Watermark updated")
Key Points
- ETL (Extract, Transform, Load) is the core pattern for moving and preparing data in Databricks.
- The Medallion Architecture organizes data into three layers: Bronze (raw), Silver (clean), and Gold (business-ready).
- Bronze stores data exactly as received — no transformations — making it a permanent audit trail.
- Silver applies cleaning, type corrections, deduplication, and joining to produce trustworthy data assets.
- Gold aggregates Silver data for specific business questions, optimized for dashboard queries and reporting.
- Data quality checks at each layer boundary catch problems early and prevent bad data from reaching consumers.
- Incremental processing uses watermark timestamps to read only new data, dramatically reducing compute costs compared to full re-processing.
- MERGE (upsert) operations at the Silver and Gold layers handle both new records and updates to existing records in one atomic operation.
