Databricks Data Ingestion

Data ingestion is the first step in every data engineering workflow. Before you can analyze data, train models, or build dashboards, you need to bring data into the Databricks environment. That data might come from files sitting in cloud storage, from operational databases, from APIs, or from real-time event streams. This topic covers the most common ingestion patterns and the specific Databricks tools that handle each one.

Think of data ingestion like receiving goods at a warehouse. The goods arrive in different vehicles — trucks (bulk file transfers), vans (database exports), motorcycles (API calls), and conveyor belts (real-time streams). Each delivery method requires a different receiving dock with different equipment. Databricks provides all these docks in one integrated system.

Reading Files from Cloud Storage

The simplest form of ingestion reads files that already exist in cloud storage — Amazon S3, Azure Data Lake Storage (ADLS), or Google Cloud Storage (GCS). These files might be CSV, JSON, Parquet, Avro, or ORC format.

Mounting Cloud Storage to DBFS

DBFS (Databricks File System) is a virtual file system that maps cloud storage paths to simple, easy-to-use paths inside Databricks. After mounting, you access your S3 bucket as /mnt/mydata/ instead of typing the full s3://bucket-name/prefix/ path everywhere.

MOUNTING AZURE DATA LAKE STORAGE
──────────────────────────────────────────────────────────────
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type":
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get(
        scope="kv-scope", key="service-principal-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(
        scope="kv-scope", key="service-principal-secret"),
    "fs.azure.account.oauth2.client.endpoint":
        "https://login.microsoftonline.com/[tenant-id]/oauth2/token"
}

dbutils.fs.mount(
    source = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/",
    mount_point = "/mnt/retail_data",
    extra_configs = configs
)

# Now list files at the mount point:
display(dbutils.fs.ls("/mnt/retail_data/"))

After mounting, all read and write operations use the /mnt/retail_data/ path. The actual files remain in Azure, but Databricks accesses them as if they were local.

Reading CSV Files

# Read a CSV file with a header row and automatic schema detection
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true")   \  # Handles fields with line breaks
    .csv("/mnt/retail_data/sales/2024/january_sales.csv")

df.printSchema()
df.show(5)

Reading Multiple Files at Once

FOLDER STRUCTURE:
──────────────────────────────────────────────────────────────
/mnt/retail_data/sales/
├── 2024/
│    ├── january_sales.csv   (500,000 rows)
│    ├── february_sales.csv  (480,000 rows)
│    └── march_sales.csv     (520,000 rows)

# Read ALL CSV files in a folder with a wildcard:
df_q1 = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/mnt/retail_data/sales/2024/*.csv")

print(f"Total Q1 rows: {df_q1.count()}")
# Output: Total Q1 rows: 1,500,000

Reading Parquet Files

Parquet is the preferred format for big data because it is columnar (reads only columns you need), compressed (stores data efficiently), and includes embedded schema information. Reading Parquet is faster than CSV and requires no schema inference.

# Parquet read is simpler — schema is embedded in the file
df = spark.read.parquet("/mnt/retail_data/products/product_catalog.parquet")

# Read a folder of partitioned Parquet files
# (data partitioned by year and month)
df = spark.read.parquet("/mnt/retail_data/transactions/")

# Spark discovers the partition columns (year, month) automatically
# and adds them to the DataFrame:
# | txn_id | amount | city | year | month |

Auto Loader – Incremental File Ingestion

Auto Loader is Databricks' solution for continuously ingesting new files as they arrive in cloud storage. It is like an automated receiving dock that never sleeps — whenever a new file drops into a folder, Auto Loader picks it up and processes it, without re-reading files it already processed.

AUTO LOADER VS REGULAR READ
──────────────────────────────────────────────────────────────
Regular spark.read:
  • Reads ALL files in the folder EVERY time you run the code
  • If 1000 files exist and 10 are new, it reads all 1000
  • Slow, wasteful, prone to reprocessing duplicates

Auto Loader (spark.readStream with cloudFiles source):
  • Tracks which files it has already processed
  • On each run, reads ONLY new files since last checkpoint
  • If 1000 files exist and 10 are new, reads only the 10 new ones
  • Works even if new files arrive while processing is running

Auto Loader Code Example

# Set up Auto Loader to process new CSV files as they arrive
df_stream = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation",
            "/mnt/retail_data/_schemas/sales") \
    .option("header", "true") \
    .load("/mnt/retail_data/incoming/sales/")

# Write new records to a Delta table as they arrive
query = df_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation",
            "/mnt/retail_data/_checkpoints/sales") \
    .outputMode("append") \
    .table("main.retail_data.sales_raw")

query.awaitTermination()
AUTO LOADER FLOW DIAGRAM
──────────────────────────────────────────────────────────────
Cloud Storage Bucket (/mnt/retail_data/incoming/sales/)
     │
     │  New file arrives: march_week1.csv
     ▼
Auto Loader detects new file (using cloud event notifications
or directory listing, depending on configuration)
     │
     ▼
Reads ONLY the new file (previous files already processed
and recorded in checkpoint)
     │
     ▼
Applies schema, parses CSV, creates Spark DataFrame
     │
     ▼
Appends rows to Delta table: main.retail_data.sales_raw
     │
     ▼
Records file as "processed" in checkpoint location
     │
     ▼
Waits for next new file (continuous processing mode)

Reading from Databases with JDBC

JDBC (Java Database Connectivity) is a standard protocol that Databricks uses to connect to relational databases — MySQL, PostgreSQL, SQL Server, Oracle, and others. You provide the database connection string and credentials, and Spark reads query results as a DataFrame.

READING FROM POSTGRESQL
──────────────────────────────────────────────────────────────
jdbc_url = "jdbc:postgresql://db-server.example.com:5432/retail_db"

connection_properties = {
    "user": dbutils.secrets.get(scope="db-secrets", key="pg-user"),
    "password": dbutils.secrets.get(scope="db-secrets",
                                     key="pg-password"),
    "driver": "org.postgresql.Driver"
}

# Read the entire customers table
df_customers = spark.read.jdbc(
    url=jdbc_url,
    table="public.customers",
    properties=connection_properties
)

# Read using a custom query (filters at the database level — faster)
df_recent = spark.read.jdbc(
    url=jdbc_url,
    table="(SELECT * FROM orders WHERE order_date >= '2024-01-01') AS recent",
    properties=connection_properties
)

Parallel JDBC Reads

By default, JDBC reads use a single connection to the database. For large tables, this is slow. You can parallelize by telling Spark to split the read across multiple partitions using a numeric column.

# Split the read across 10 parallel database connections
# based on order_id ranges
df_parallel = spark.read.jdbc(
    url=jdbc_url,
    table="public.orders",
    column="order_id",      # ← Numeric column to partition on
    lowerBound=1,           # ← Minimum value
    upperBound=10000000,    # ← Maximum value
    numPartitions=10,       # ← 10 parallel readers
    properties=connection_properties
)

# Each partition reads:
# Partition 1: order_id 1 – 1,000,000
# Partition 2: order_id 1,000,001 – 2,000,000
# ...
# Partition 10: order_id 9,000,001 – 10,000,000
# All 10 run simultaneously → 10x faster than single connection

Using Databricks Partner Connectors

Databricks integrates with popular data sources through native connectors. These provide better performance and features than generic JDBC.

POPULAR DATABRICKS CONNECTORS
──────────────────────────────────────────────────────────────
Source System         │ Connector Type       │ Use Case
──────────────────────┼──────────────────────┼────────────────
Salesforce            │ Databricks Connector │ CRM data
Google BigQuery       │ Spark Connector      │ GCP analytics
Snowflake             │ Spark Connector      │ Cloud DW data
MongoDB               │ Spark Connector      │ NoSQL data
Apache Kafka          │ Spark Streaming      │ Real-time events
Amazon Kinesis        │ Spark Streaming      │ AWS streaming
Azure Event Hubs      │ Spark Streaming      │ Azure streaming
REST APIs             │ Custom Python code   │ Web services

Reading from REST APIs

Many data sources expose data through REST APIs — weather services, payment gateways, logistics platforms, social media platforms. You call these APIs using Python's requests library and convert the JSON response into a Spark DataFrame.

# Example: Reading from a simple REST API
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Call the API
response = requests.get(
    url="https://api.example.com/orders",
    headers={"Authorization": f"Bearer {api_token}"},
    params={"start_date": "2024-01-01", "page_size": 1000}
)

orders = response.json()["data"]

# Convert to Spark DataFrame
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("amount", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", StringType(), True)
])

df_orders = spark.createDataFrame(orders, schema)
df_orders.write.format("delta").mode("append") \
         .table("main.retail_data.api_orders")

For APIs that paginate results (returning 1000 records at a time over many pages), loop through pages and union all results into a single DataFrame before writing to the Delta table.

Ingesting Real-Time Data from Kafka

Apache Kafka is a distributed event streaming platform. Applications publish messages to Kafka topics (like channels), and other applications subscribe to read those messages. Databricks Structured Streaming reads from Kafka topics as a continuous stream.

KAFKA INGESTION FLOW
──────────────────────────────────────────────────────────────
PRODUCERS (Applications sending data)
  E-commerce checkout system
  Mobile app click events
  IoT sensors
       │
       │ Publish messages
       ▼
KAFKA CLUSTER
  Topic: "checkout-events"
  Partitions: [P0] [P1] [P2] [P3] [P4]  ← Data split across partitions
  Retention: 7 days (messages stored even after consumed)
       │
       │ Read messages
       ▼
DATABRICKS STRUCTURED STREAMING
  Reads new messages continuously
  Parses JSON payload
  Writes to Delta table
       │
       ▼
DELTA TABLE: raw_checkout_events
  Every completed purchase appears here within seconds
# Read from Kafka topic
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            "kafka-broker1:9092,kafka-broker2:9092") \
    .option("subscribe", "checkout-events") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka returns binary key and value — decode and parse JSON
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

checkout_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", IntegerType()),
    StructField("total_amount", DoubleType()),
    StructField("payment_method", StringType()),
    StructField("timestamp", LongType())
])

df_parsed = df_kafka \
    .withColumn("value_str",
                col("value").cast("string")) \
    .withColumn("data", from_json("value_str", checkout_schema)) \
    .select("data.*")

# Write to Delta table
df_parsed.writeStream \
    .format("delta") \
    .option("checkpointLocation",
            "/mnt/checkpoints/kafka_checkout") \
    .outputMode("append") \
    .table("main.streaming.raw_checkout_events")

Data Ingestion Best Practices

INGESTION BEST PRACTICES
──────────────────────────────────────────────────────────────
1. NEVER hardcode credentials in notebooks
   Use: dbutils.secrets.get(scope="...", key="...")
   Never: password = "mypassword123"

2. Land raw data first, transform later
   Load raw data to a "bronze" Delta table first.
   Transform in a separate step to "silver".
   This preserves original data for debugging.

3. Add metadata columns during ingestion
   df = df.withColumn("ingestion_timestamp",
                       current_timestamp())
          .withColumn("source_file",
                       input_file_name())
   → Tracks when and where each record came from.

4. Validate data after loading
   Check: df.count() > 0 (not empty)
   Check: df.where(col("amount").isNull()).count() == 0
   Check: df.where(col("date") > current_date()).count() == 0
   → Catch data quality issues before downstream consumers see them.

5. Use Parquet or Delta as your target format
   Never write raw ingested data back to CSV.
   Parquet: good for cold archives.
   Delta: best for tables that need updates or queries.

Key Points

  • Databricks reads files from cloud storage (S3, ADLS, GCS) in formats like CSV, JSON, and Parquet using spark.read.
  • Auto Loader ingests new files incrementally using checkpointing — it processes only new files, never re-reading files already loaded.
  • JDBC connects Databricks to relational databases (MySQL, PostgreSQL, SQL Server) with optional parallel partitioning for large tables.
  • REST API data is read with Python's requests library and converted to a Spark DataFrame before writing to Delta Lake.
  • Structured Streaming reads from Kafka topics continuously, processing each new message as it arrives.
  • Always store credentials in Databricks Secrets, never as plain text in notebooks.
  • Land raw data first in a bronze layer before any transformation — preserving the original data makes debugging much easier.

Leave a Comment