Databricks Data Ingestion
Data ingestion is the first step in every data engineering workflow. Before you can analyze data, train models, or build dashboards, you need to bring data into the Databricks environment. That data might come from files sitting in cloud storage, from operational databases, from APIs, or from real-time event streams. This topic covers the most common ingestion patterns and the specific Databricks tools that handle each one.
Think of data ingestion like receiving goods at a warehouse. The goods arrive in different vehicles — trucks (bulk file transfers), vans (database exports), motorcycles (API calls), and conveyor belts (real-time streams). Each delivery method requires a different receiving dock with different equipment. Databricks provides all these docks in one integrated system.
Reading Files from Cloud Storage
The simplest form of ingestion reads files that already exist in cloud storage — Amazon S3, Azure Data Lake Storage (ADLS), or Google Cloud Storage (GCS). These files might be CSV, JSON, Parquet, Avro, or ORC format.
Mounting Cloud Storage to DBFS
DBFS (Databricks File System) is a virtual file system that maps cloud storage paths to simple, easy-to-use paths inside Databricks. After mounting, you access your S3 bucket as /mnt/mydata/ instead of typing the full s3://bucket-name/prefix/ path everywhere.
MOUNTING AZURE DATA LAKE STORAGE
──────────────────────────────────────────────────────────────
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type":
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get(
scope="kv-scope", key="service-principal-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(
scope="kv-scope", key="service-principal-secret"),
"fs.azure.account.oauth2.client.endpoint":
"https://login.microsoftonline.com/[tenant-id]/oauth2/token"
}
dbutils.fs.mount(
source = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/",
mount_point = "/mnt/retail_data",
extra_configs = configs
)
# Now list files at the mount point:
display(dbutils.fs.ls("/mnt/retail_data/"))
After mounting, all read and write operations use the /mnt/retail_data/ path. The actual files remain in Azure, but Databricks accesses them as if they were local.
Reading CSV Files
# Read a CSV file with a header row and automatic schema detection
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("encoding", "UTF-8") \
.option("multiLine", "true") \ # Handles fields with line breaks
.csv("/mnt/retail_data/sales/2024/january_sales.csv")
df.printSchema()
df.show(5)
Reading Multiple Files at Once
FOLDER STRUCTURE:
──────────────────────────────────────────────────────────────
/mnt/retail_data/sales/
├── 2024/
│ ├── january_sales.csv (500,000 rows)
│ ├── february_sales.csv (480,000 rows)
│ └── march_sales.csv (520,000 rows)
# Read ALL CSV files in a folder with a wildcard:
df_q1 = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/mnt/retail_data/sales/2024/*.csv")
print(f"Total Q1 rows: {df_q1.count()}")
# Output: Total Q1 rows: 1,500,000
Reading Parquet Files
Parquet is the preferred format for big data because it is columnar (reads only columns you need), compressed (stores data efficiently), and includes embedded schema information. Reading Parquet is faster than CSV and requires no schema inference.
# Parquet read is simpler — schema is embedded in the file
df = spark.read.parquet("/mnt/retail_data/products/product_catalog.parquet")
# Read a folder of partitioned Parquet files
# (data partitioned by year and month)
df = spark.read.parquet("/mnt/retail_data/transactions/")
# Spark discovers the partition columns (year, month) automatically
# and adds them to the DataFrame:
# | txn_id | amount | city | year | month |
Auto Loader – Incremental File Ingestion
Auto Loader is Databricks' solution for continuously ingesting new files as they arrive in cloud storage. It is like an automated receiving dock that never sleeps — whenever a new file drops into a folder, Auto Loader picks it up and processes it, without re-reading files it already processed.
AUTO LOADER VS REGULAR READ ────────────────────────────────────────────────────────────── Regular spark.read: • Reads ALL files in the folder EVERY time you run the code • If 1000 files exist and 10 are new, it reads all 1000 • Slow, wasteful, prone to reprocessing duplicates Auto Loader (spark.readStream with cloudFiles source): • Tracks which files it has already processed • On each run, reads ONLY new files since last checkpoint • If 1000 files exist and 10 are new, reads only the 10 new ones • Works even if new files arrive while processing is running
Auto Loader Code Example
# Set up Auto Loader to process new CSV files as they arrive
df_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaLocation",
"/mnt/retail_data/_schemas/sales") \
.option("header", "true") \
.load("/mnt/retail_data/incoming/sales/")
# Write new records to a Delta table as they arrive
query = df_stream.writeStream \
.format("delta") \
.option("checkpointLocation",
"/mnt/retail_data/_checkpoints/sales") \
.outputMode("append") \
.table("main.retail_data.sales_raw")
query.awaitTermination()
AUTO LOADER FLOW DIAGRAM
──────────────────────────────────────────────────────────────
Cloud Storage Bucket (/mnt/retail_data/incoming/sales/)
│
│ New file arrives: march_week1.csv
▼
Auto Loader detects new file (using cloud event notifications
or directory listing, depending on configuration)
│
▼
Reads ONLY the new file (previous files already processed
and recorded in checkpoint)
│
▼
Applies schema, parses CSV, creates Spark DataFrame
│
▼
Appends rows to Delta table: main.retail_data.sales_raw
│
▼
Records file as "processed" in checkpoint location
│
▼
Waits for next new file (continuous processing mode)
Reading from Databases with JDBC
JDBC (Java Database Connectivity) is a standard protocol that Databricks uses to connect to relational databases — MySQL, PostgreSQL, SQL Server, Oracle, and others. You provide the database connection string and credentials, and Spark reads query results as a DataFrame.
READING FROM POSTGRESQL
──────────────────────────────────────────────────────────────
jdbc_url = "jdbc:postgresql://db-server.example.com:5432/retail_db"
connection_properties = {
"user": dbutils.secrets.get(scope="db-secrets", key="pg-user"),
"password": dbutils.secrets.get(scope="db-secrets",
key="pg-password"),
"driver": "org.postgresql.Driver"
}
# Read the entire customers table
df_customers = spark.read.jdbc(
url=jdbc_url,
table="public.customers",
properties=connection_properties
)
# Read using a custom query (filters at the database level — faster)
df_recent = spark.read.jdbc(
url=jdbc_url,
table="(SELECT * FROM orders WHERE order_date >= '2024-01-01') AS recent",
properties=connection_properties
)
Parallel JDBC Reads
By default, JDBC reads use a single connection to the database. For large tables, this is slow. You can parallelize by telling Spark to split the read across multiple partitions using a numeric column.
# Split the read across 10 parallel database connections
# based on order_id ranges
df_parallel = spark.read.jdbc(
url=jdbc_url,
table="public.orders",
column="order_id", # ← Numeric column to partition on
lowerBound=1, # ← Minimum value
upperBound=10000000, # ← Maximum value
numPartitions=10, # ← 10 parallel readers
properties=connection_properties
)
# Each partition reads:
# Partition 1: order_id 1 – 1,000,000
# Partition 2: order_id 1,000,001 – 2,000,000
# ...
# Partition 10: order_id 9,000,001 – 10,000,000
# All 10 run simultaneously → 10x faster than single connection
Using Databricks Partner Connectors
Databricks integrates with popular data sources through native connectors. These provide better performance and features than generic JDBC.
POPULAR DATABRICKS CONNECTORS ────────────────────────────────────────────────────────────── Source System │ Connector Type │ Use Case ──────────────────────┼──────────────────────┼──────────────── Salesforce │ Databricks Connector │ CRM data Google BigQuery │ Spark Connector │ GCP analytics Snowflake │ Spark Connector │ Cloud DW data MongoDB │ Spark Connector │ NoSQL data Apache Kafka │ Spark Streaming │ Real-time events Amazon Kinesis │ Spark Streaming │ AWS streaming Azure Event Hubs │ Spark Streaming │ Azure streaming REST APIs │ Custom Python code │ Web services
Reading from REST APIs
Many data sources expose data through REST APIs — weather services, payment gateways, logistics platforms, social media platforms. You call these APIs using Python's requests library and convert the JSON response into a Spark DataFrame.
# Example: Reading from a simple REST API
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Call the API
response = requests.get(
url="https://api.example.com/orders",
headers={"Authorization": f"Bearer {api_token}"},
params={"start_date": "2024-01-01", "page_size": 1000}
)
orders = response.json()["data"]
# Convert to Spark DataFrame
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("amount", DoubleType(), True),
StructField("status", StringType(), True),
StructField("created_at", StringType(), True)
])
df_orders = spark.createDataFrame(orders, schema)
df_orders.write.format("delta").mode("append") \
.table("main.retail_data.api_orders")
For APIs that paginate results (returning 1000 records at a time over many pages), loop through pages and union all results into a single DataFrame before writing to the Delta table.
Ingesting Real-Time Data from Kafka
Apache Kafka is a distributed event streaming platform. Applications publish messages to Kafka topics (like channels), and other applications subscribe to read those messages. Databricks Structured Streaming reads from Kafka topics as a continuous stream.
KAFKA INGESTION FLOW
──────────────────────────────────────────────────────────────
PRODUCERS (Applications sending data)
E-commerce checkout system
Mobile app click events
IoT sensors
│
│ Publish messages
▼
KAFKA CLUSTER
Topic: "checkout-events"
Partitions: [P0] [P1] [P2] [P3] [P4] ← Data split across partitions
Retention: 7 days (messages stored even after consumed)
│
│ Read messages
▼
DATABRICKS STRUCTURED STREAMING
Reads new messages continuously
Parses JSON payload
Writes to Delta table
│
▼
DELTA TABLE: raw_checkout_events
Every completed purchase appears here within seconds
# Read from Kafka topic
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"kafka-broker1:9092,kafka-broker2:9092") \
.option("subscribe", "checkout-events") \
.option("startingOffsets", "latest") \
.load()
# Kafka returns binary key and value — decode and parse JSON
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *
checkout_schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", IntegerType()),
StructField("total_amount", DoubleType()),
StructField("payment_method", StringType()),
StructField("timestamp", LongType())
])
df_parsed = df_kafka \
.withColumn("value_str",
col("value").cast("string")) \
.withColumn("data", from_json("value_str", checkout_schema)) \
.select("data.*")
# Write to Delta table
df_parsed.writeStream \
.format("delta") \
.option("checkpointLocation",
"/mnt/checkpoints/kafka_checkout") \
.outputMode("append") \
.table("main.streaming.raw_checkout_events")
Data Ingestion Best Practices
INGESTION BEST PRACTICES
──────────────────────────────────────────────────────────────
1. NEVER hardcode credentials in notebooks
Use: dbutils.secrets.get(scope="...", key="...")
Never: password = "mypassword123"
2. Land raw data first, transform later
Load raw data to a "bronze" Delta table first.
Transform in a separate step to "silver".
This preserves original data for debugging.
3. Add metadata columns during ingestion
df = df.withColumn("ingestion_timestamp",
current_timestamp())
.withColumn("source_file",
input_file_name())
→ Tracks when and where each record came from.
4. Validate data after loading
Check: df.count() > 0 (not empty)
Check: df.where(col("amount").isNull()).count() == 0
Check: df.where(col("date") > current_date()).count() == 0
→ Catch data quality issues before downstream consumers see them.
5. Use Parquet or Delta as your target format
Never write raw ingested data back to CSV.
Parquet: good for cold archives.
Delta: best for tables that need updates or queries.
Key Points
- Databricks reads files from cloud storage (S3, ADLS, GCS) in formats like CSV, JSON, and Parquet using
spark.read. - Auto Loader ingests new files incrementally using checkpointing — it processes only new files, never re-reading files already loaded.
- JDBC connects Databricks to relational databases (MySQL, PostgreSQL, SQL Server) with optional parallel partitioning for large tables.
- REST API data is read with Python's requests library and converted to a Spark DataFrame before writing to Delta Lake.
- Structured Streaming reads from Kafka topics continuously, processing each new message as it arrives.
- Always store credentials in Databricks Secrets, never as plain text in notebooks.
- Land raw data first in a bronze layer before any transformation — preserving the original data makes debugging much easier.
