Spark File Formats
Spark reads from and writes to many types of storage: local file systems, HDFS, Amazon S3, Azure Blob Storage, databases, and more. The unified DataFrameReader and DataFrameWriter APIs handle all of these with consistent syntax.
The Read-Process-Write Pattern
[ Storage Source ] --read--> [ Spark Processing ] --write--> [ Storage Sink ] Examples: CSV on S3 --> filter, join, aggregate --> Parquet on S3 MySQL table --> transform columns --> Delta Lake table JSON stream --> enrich with lookup --> Kafka topic
Reading Data
CSV
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("sep", ",") \
.csv("data/sales.csv")
# Read multiple files at once
df = spark.read.csv("data/sales_*.csv", header=True, inferSchema=True)
JSON
# Each line is one JSON object (default)
df = spark.read.json("data/events.json")
# Multiline JSON (one object spanning multiple lines)
df = spark.read.option("multiline", "true").json("data/config.json")
Parquet
# Parquet preserves schema automatically — no inferSchema needed
df = spark.read.parquet("data/transactions.parquet")
From a Database (JDBC)
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://host:3306/mydb") \
.option("dbtable", "orders") \
.option("user", "admin") \
.option("password", "secret") \
.load()
Defining a Schema Manually
Always define a schema for production jobs. inferSchema is convenient but reads the file twice (once to guess types, once to load). A manual schema reads the file once and is always correct.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
StructField("order_id", IntegerType(), nullable=False),
StructField("customer", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("order_date", StringType(), nullable=True),
])
df = spark.read.schema(schema).csv("orders.csv", header=True)
Writing Data
Write Modes
| Mode | Behavior |
|---|---|
| overwrite | Delete existing data and write fresh |
| append | Add new records to existing data |
| ignore | Do nothing if destination already exists |
| error (default) | Throw an error if destination exists |
Write to CSV
df.write \
.mode("overwrite") \
.option("header", "true") \
.csv("/output/results")
Write to Parquet
df.write.mode("overwrite").parquet("/output/results.parquet")
Write to a Database (JDBC)
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/mydb") \
.option("dbtable", "processed_orders") \
.option("user", "admin") \
.option("password", "secret") \
.mode("append") \
.save()
Partitioned Writes — Organizing Output Files
Partitioned writes split output files by the value of one or more columns. This makes future reads much faster because Spark reads only the relevant partition folders instead of the entire dataset.
# Write sales data partitioned by year and region
df.write \
.partitionBy("year", "region") \
.mode("overwrite") \
.parquet("/output/sales_partitioned")
# Resulting folder structure:
# /output/sales_partitioned/
# year=2023/
# region=North/part-00000.parquet
# region=South/part-00001.parquet
# year=2024/
# region=North/part-00002.parquet
# region=South/part-00003.parquet
# Future query reads only 2024/North — skips everything else
df_2024_north = spark.read.parquet("/output/sales_partitioned") \
.filter("year = 2024 AND region = 'North'")
Reading from Amazon S3
# Set AWS credentials (or use IAM role in production)
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_KEY")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET")
df = spark.read.parquet("s3a://my-bucket/data/sales/")
df.write.mode("overwrite").parquet("s3a://my-bucket/output/results/")
Best Practices
- Use Parquet as the default storage format — it is columnar, compressed, and fast
- Always define a schema explicitly in production — never rely on
inferSchema - Use
partitionByon columns you filter often (date, region, category) - Avoid writing thousands of small files — use
coalesce()before writing
