Databricks Performance Optimization
A query that takes three hours to run at the end of each business day costs money, delays decisions, and frustrates users. A pipeline that processes data slowly extends the gap between when events happen and when anyone can act on them. Performance optimization is not just a technical concern — it directly affects how quickly an organization can respond to its data.
Databricks provides a layered set of performance features, from storage-level optimizations built into Delta Lake to cluster configuration choices that affect every job. Understanding these features and when to apply them separates competent Databricks users from experts who reliably deliver fast, efficient pipelines.
Understanding Where Time Goes: The Spark Execution Model
Before optimizing Spark jobs, understanding how Spark executes queries reveals where time is actually spent.
When you submit a Spark query, the Spark engine creates an execution plan. This plan divides the computation into stages. Each stage performs operations that can run in parallel without exchanging data between machines. When data must be exchanged — such as during a join or a sort — a shuffle occurs. The shuffle boundary between stages is the most expensive operation in Spark.
Think of a distributed team writing a group report. Each person researches their assigned sections independently (parallel within a stage). When they need to combine chapters, they email each other their sections, wait for everyone's emails to arrive, and then assemble the complete document (shuffle between stages). The assembling step takes much longer than the individual research because it requires coordination across everyone involved.
The Spark UI visualizes this execution plan as a DAG (Directed Acyclic Graph) — a flowchart showing stages, tasks, and where shuffles occur. Examining the Spark UI is the first step in any serious performance investigation. It reveals exactly which stages take the longest and whether shuffles are the bottleneck.
Photon: The High-Performance Native Execution Engine
Photon is Databricks' proprietary query execution engine, written in C++ and replacing the default Java-based Spark executor. It is the single most impactful performance improvement available in Databricks without any query changes.
The default Spark executor runs on the Java Virtual Machine (JVM). The JVM adds overhead — garbage collection pauses, memory management complexity, and the inherent performance gap between managed and native code. Photon bypasses the JVM entirely, executing vectorized operations directly in optimized C++ code.
Vectorized execution processes data column by column rather than row by row. Instead of applying an operation to one row at a time, Photon applies it to a batch of 1,000 or more values simultaneously using modern CPU SIMD instructions. Modern CPUs are designed to perform this kind of parallel operation at the hardware level.
Think of harvesting a field manually. Row-by-row processing is like picking each apple individually — pick one, walk to the basket, put it down, walk back. Vectorized processing is like using a mechanical harvester that picks an entire row simultaneously. Same destination, dramatically less travel.
Photon delivers 2x to 8x speed improvements on SQL-heavy workloads compared to the standard Spark executor. Enabling Photon requires only selecting a Photon-enabled cluster type — no code changes, no query modifications.
Delta Lake Optimization: Z-Ordering
Delta Lake is Databricks' table storage format. It stores data as Parquet files in cloud storage. When a query filters data (WHERE clause), Spark must read the relevant files. If the query filters on a column that has data spread randomly across all files, Spark reads many files to find matching rows — this is called full table scan, and it is slow.
Z-Ordering reorganizes data files so that rows with similar values in the specified column are stored close together in the same files. When a query filters on that column, Spark reads far fewer files to find matching rows.
Imagine a library with 100,000 books shelved randomly. Finding all books by Stephen King means checking every single shelf. After Z-Ordering by author, all King books are grouped together. Finding them requires checking only a few shelves.
OPTIMIZE customers_table ZORDER BY (region, customer_tier)This command reorganizes the customers_table files so that customers from the same region and tier are stored together. A subsequent query filtering WHERE region = 'North' AND customer_tier = 'Premium' reads far fewer files and completes much faster.
Z-Ordering works best for columns frequently used in filter conditions, columns with high cardinality (many distinct values), and columns used in JOIN conditions. Avoid Z-Ordering low-cardinality columns (like a boolean true/false flag) — the grouping provides minimal benefit.
Delta Lake Optimization: OPTIMIZE and Compaction
Delta tables accumulate small files over time. Streaming jobs that write thousands of micro-batches each create a separate file. Frequent small writes from multiple concurrent processes create thousands of tiny files. Reading data from thousands of small files is slower than reading the same data from a few large files because each file read has overhead regardless of file size.
The OPTIMIZE command compacts small files into larger ones:
OPTIMIZE transactions_tableDatabricks recommends running OPTIMIZE regularly on actively written tables — daily for high-frequency tables, weekly for lower-frequency ones. This is like periodically reorganizing a filing cabinet. Over time, documents accumulate in many small folders. Consolidating them into fewer, larger folders (by topic or time period) makes finding and retrieving them faster.
Data Skipping and File Statistics
Delta Lake stores statistics about each data file — specifically, the minimum and maximum values of each column within that file. These statistics enable data skipping: when a query filters on a column, Spark checks each file's min/max statistics before reading it. Files whose entire range falls outside the filter condition get skipped entirely.
Example: A table has 1,000 data files. A query filters for transactions where amount > 10,000. Each file's statistics show the minimum and maximum amount in that file. Files whose maximum amount is below 10,000 get skipped automatically. If 800 of the 1,000 files contain no transactions above 10,000, Spark reads only 200 files instead of all 1,000. The query finishes 5x faster with no changes to the query itself.
Data skipping works automatically in Delta Lake. No configuration is needed. Its effectiveness increases when Z-Ordering is applied because Z-Ordering groups similar values together, making file-level min/max statistics more selective.
Liquid Clustering: The Modern Alternative to Partitioning
Traditional Hive-style partitioning divides table data into separate directories based on a column's values. A table partitioned by date creates one directory per date — date=2024-01-01/, date=2024-01-02/, etc. Queries filtering by date read only the relevant directory.
Partitioning works well when the partition column is known upfront, has manageable cardinality, and query patterns are stable. But partitioning creates problems when partitions become too small (thousands of tiny date partitions for a small table) or when query patterns change (suddenly querying by region instead of date means partitioning by date helps nothing).
Liquid Clustering is Databricks' newer, more flexible approach. Instead of organizing data into rigid directory hierarchies, Liquid Clustering dynamically reorganizes data as it is written to optimize for current query patterns. It handles changing query patterns gracefully, avoids small partition problems, and works incrementally on newly written data without reorganizing the entire table.
CREATE TABLE transactions_table
CLUSTER BY (transaction_date, merchant_category);Liquid Clustering automatically improves data layout over time as new data arrives. Run the OPTIMIZE command periodically to apply clustering to recently written data. Databricks recommends Liquid Clustering over traditional partitioning for most new tables.
Caching: Avoiding Repeated Computation
If the same DataFrame is used multiple times in a job — for example, a large filtered dataset that feeds into three different aggregation queries — Spark recomputes it from scratch each time by default. Caching stores the result in cluster memory so subsequent uses are instant.
# Without caching: expensive_df is computed three times
result_a = expensive_df.groupBy("region").sum("amount")
result_b = expensive_df.groupBy("product").count()
result_c = expensive_df.filter(col("amount") > 1000)
# With caching: expensive_df is computed once
expensive_df.cache()
result_a = expensive_df.groupBy("region").sum("amount")
result_b = expensive_df.groupBy("product").count()
result_c = expensive_df.filter(col("amount") > 1000)
expensive_df.unpersist() # Release memory when done
Use caching when the same DataFrame is accessed multiple times within one job. Do not cache data that is used only once — it wastes memory. Always unpersist cached DataFrames when finished to free cluster memory for other operations.
Databricks Delta Cache (distinct from Spark's DataFrame cache) is an automatic optimization that caches recently accessed data from cloud storage on the cluster's local NVMe SSDs. Subsequent reads of the same data skip the network call to cloud storage and read from the fast local disk instead. This is transparent — no code changes required.
Broadcast Joins: Eliminating Shuffle for Small Tables
A standard Spark join between two large tables requires a shuffle — both tables are redistributed across the cluster so that matching rows end up on the same machine. Shuffles are expensive because they involve network data transfer.
When one table in a join is small enough to fit in memory on each executor, a broadcast join eliminates the shuffle entirely. Spark sends (broadcasts) the small table to every executor. Each executor then performs the join locally against its portion of the large table — no network exchange required.
from pyspark.sql.functions import broadcast
# Standard join (shuffle required)
result = large_transactions.join(region_lookup, "region_code")
# Broadcast join (no shuffle, much faster)
result = large_transactions.join(broadcast(region_lookup), "region_code")
Broadcast joins work when the smaller table fits in memory. Databricks automatically broadcasts tables under a configurable threshold (10 MB by default, but adjustable). For tables between 10 MB and a few hundred MB, explicitly using the broadcast hint provides significant speedups.
Adaptive Query Execution (AQE)
Traditional Spark creates a fixed execution plan before running a query. This plan is based on statistics that may be out of date or inaccurate. If the statistics say a table has 1 million rows but the actual data has 100 million rows, the plan optimizes for the wrong scale and performs poorly.
Adaptive Query Execution (AQE), enabled by default in Databricks, adjusts the execution plan at runtime based on actual data statistics collected during query execution. AQE makes three key optimizations automatically:
Dynamic Coalescing of Shuffle Partitions
After a shuffle, Spark divides data into a fixed number of partitions (200 by default). If the actual shuffled data is small, 200 partitions means 200 tiny tasks with high overhead relative to their work. AQE detects when shuffled data is small and merges adjacent small partitions, reducing the number of tasks to a more appropriate count.
Dynamic Join Strategies
AQE can switch from a shuffle join to a broadcast join mid-query if it discovers one table is much smaller than expected after filtering. This happens automatically without any query hints from the developer.
Dynamic Skew Optimization
Data skew occurs when one key value has far more rows than others. If 90% of all transactions belong to a single customer ID, the task processing that customer's data takes 10x longer than other tasks, stalling the entire stage.
AQE detects skewed partitions and automatically splits them into smaller tasks that can run in parallel. The overall stage completes in the time of a balanced task, not the time of the overloaded one.
Cluster Configuration: Choosing the Right Size and Type
No amount of query optimization compensates for a fundamentally misconfigured cluster. The right cluster configuration depends on the workload type.
Driver vs. Worker Sizing
The driver node coordinates the cluster — it plans queries, tracks task progress, and collects final results. Worker nodes execute the actual computation. A driver that is too small becomes a bottleneck for collecting results from many workers. Workers that are too small cannot hold data in memory and spill to disk constantly.
General guidance: For interactive analytics, the driver can be smaller than workers. For large collect() operations that bring data back to the driver, use a driver with substantial memory.
Memory-Optimized vs. Compute-Optimized Instances
- Memory-optimized instances (more RAM per CPU) work best for workloads that process large datasets, especially joins and aggregations where data needs to fit in memory.
- Compute-optimized instances (more CPU per RAM) work best for Photon-enabled SQL workloads that perform intensive computation on data that fits in memory.
- Storage-optimized instances (fast NVMe SSDs) benefit workloads that read and write large volumes of data from cloud storage, leveraging the Delta Cache on local SSDs.
Autoscaling
Cluster autoscaling adds workers when the queue of pending tasks grows (cluster is overloaded) and removes workers when they are idle. This matches compute resources to actual demand, reducing cost during light workloads and maintaining performance during peak loads.
Enhanced Autoscaling in Databricks is smarter than standard Spark autoscaling. It scales down idle workers more aggressively without impacting running queries, and scales up proactively based on job history patterns.
Partitioning Query Data: Reading Less
The fastest data to process is data you never read. When tables are partitioned by a relevant column, queries that filter on that column skip entire partitions.
A transactions table partitioned by year and month might hold five years of data across 60 partitions. A query asking for last month's transactions reads only 1 of 60 partitions — 1/60th of the total data. Query time drops proportionally.
Partitioning requires selecting partition columns carefully. Good partition columns have moderate cardinality (not too few unique values like a boolean, not too many like a UUID), are frequently used in filter conditions, and produce roughly equal amounts of data per partition value.
For new tables, Liquid Clustering often provides better results than traditional partitioning. For existing large tables already using traditional partitioning, the switch to Liquid Clustering requires careful planning.
VACUUM: Removing Old Files
Delta Lake's time travel capability keeps older versions of data by retaining files that were replaced or deleted. Over time, these old files accumulate and occupy storage space. The VACUUM command removes old files beyond the retention threshold.
VACUUM transactions_table RETAIN 7 DAYS;VACUUM removes files older than 7 days that are no longer part of the current table version. Smaller storage footprints mean faster file listing operations and lower cloud storage costs. Run VACUUM periodically on large, actively updated tables.
The default retention is 7 days. Lowering this below 7 days risks breaking concurrent readers accessing older table versions and is generally not recommended without specific justification.
Query Profiling: Finding the Real Bottleneck
Guessing where a query spends its time leads to optimizing the wrong thing. The Databricks Query Profile (available for SQL queries in Databricks SQL) provides a visual breakdown of query execution time, showing exactly which operations consume the most time and resources.
The Query Profile displays a tree of operators — scans, filters, joins, aggregations, sorts — with timing and row count information for each. Common patterns to look for:
- Large row count explosion at a join — A join is producing many more rows than expected, suggesting a many-to-many join where a one-to-many was intended, or missing filter conditions.
- Slow scan despite filters — Data skipping is not working effectively; consider Z-Ordering or Liquid Clustering on the filter column.
- Huge shuffle size — Too much data is crossing the network during shuffles; consider broadcast joins or reducing data size before the join.
- Skewed task duration — One task takes 10x longer than others; apply AQE's skew handling or manually salt the skewed key.
Predictive I/O and Deletion Vectors
Databricks has added intelligent read optimizations in recent runtime versions. Predictive I/O uses machine learning to predict which data a query will need based on query patterns and prefetches that data from cloud storage before the query requests it. This hides the latency of cloud storage reads behind computation time.
Deletion Vectors change how DELETE and UPDATE operations work on Delta tables. Traditional DELETE operations rewrite entire data files even if only a few rows change. Deletion Vectors instead maintain a separate small file that records which rows are deleted, without rewriting the large data files. Subsequent reads combine the data file with the deletion vector to produce the correct results. This makes DELETE and UPDATE operations much faster for typical workloads where a small fraction of rows change.
Real-World Optimization Case Study
A retail analytics team runs a daily report joining three large tables: customer records (500 million rows), transactions (2 billion rows), and product catalog (2 million rows). The report takes 4 hours to run and costs the company significant compute spend each day.
The optimization process starts with the Spark UI. The DAG shows one shuffle stage taking 3.5 of the 4 hours. The shuffle involves the transactions and customer tables — both large, requiring a full shuffle join.
Step 1: Enable Photon on the cluster. The shuffle stage drops from 3.5 hours to 2.1 hours. No code changes.
Step 2: The product catalog (2 million rows, approximately 50 MB) is joined against both other tables. Broadcast hints convert these to broadcast joins, eliminating two smaller shuffles. Total time drops to 1.8 hours.
Step 3: The transactions table has no Z-Ordering. The query filters transactions to the last 90 days. Running OPTIMIZE on transactions with ZORDER BY (transaction_date) allows data skipping to eliminate 75% of file reads for the date filter. Time drops to 1.2 hours.
Step 4: The transactions table is not partitioned by date. Adding Liquid Clustering by date means the date filter now reads approximately 30% of files instead of relying solely on data skipping. Time drops to 45 minutes.
Step 5: The cluster uses general-purpose instances. Switching to memory-optimized instances (more RAM) allows more data to fit in memory, reducing disk spill during the remaining shuffle. Final time: 28 minutes.
The 4-hour query now runs in 28 minutes — a 91% reduction — through a combination of storage optimization, query optimization, and cluster configuration. Each step was guided by profiling data, not guesswork.
Key Points Summary
- The Spark UI and Databricks Query Profile are the essential starting points — measure before optimizing.
- Photon provides 2x to 8x speedups on SQL workloads with zero code changes by replacing the JVM executor with a native C++ vectorized engine.
- Z-Ordering collocates similar values in the same files, enabling data skipping that dramatically reduces files read during filtered queries.
- OPTIMIZE compacts small files into larger ones, reducing file read overhead.
- Liquid Clustering is the modern alternative to traditional partitioning, handling changing query patterns flexibly.
- Broadcast joins eliminate shuffle for queries joining a large table with a small one.
- Adaptive Query Execution (AQE) automatically optimizes join strategies, partition counts, and skew handling at runtime.
- Delta Cache on local SSDs speeds repeated access to cloud storage data transparently.
- Cluster sizing — instance type and autoscaling configuration — matches compute resources to workload characteristics.
- VACUUM removes old Delta Lake files, keeping storage footprints manageable and listing operations fast.
