Spark File Formats

Spark reads from and writes to many types of storage: local file systems, HDFS, Amazon S3, Azure Blob Storage, databases, and more. The unified DataFrameReader and DataFrameWriter APIs handle all of these with consistent syntax.

The Read-Process-Write Pattern

[ Storage Source ]  --read-->  [ Spark Processing ]  --write-->  [ Storage Sink ]

Examples:
CSV on S3       -->  filter, join, aggregate  -->  Parquet on S3
MySQL table     -->  transform columns        -->  Delta Lake table
JSON stream     -->  enrich with lookup       -->  Kafka topic

Reading Data

CSV

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ",") \
    .csv("data/sales.csv")

# Read multiple files at once
df = spark.read.csv("data/sales_*.csv", header=True, inferSchema=True)

JSON

# Each line is one JSON object (default)
df = spark.read.json("data/events.json")

# Multiline JSON (one object spanning multiple lines)
df = spark.read.option("multiline", "true").json("data/config.json")

Parquet

# Parquet preserves schema automatically — no inferSchema needed
df = spark.read.parquet("data/transactions.parquet")

From a Database (JDBC)

df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://host:3306/mydb") \
    .option("dbtable", "orders") \
    .option("user", "admin") \
    .option("password", "secret") \
    .load()

Defining a Schema Manually

Always define a schema for production jobs. inferSchema is convenient but reads the file twice (once to guess types, once to load). A manual schema reads the file once and is always correct.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("order_id",   IntegerType(), nullable=False),
    StructField("customer",   StringType(),  nullable=True),
    StructField("amount",     DoubleType(),  nullable=True),
    StructField("order_date", StringType(),  nullable=True),
])

df = spark.read.schema(schema).csv("orders.csv", header=True)

Writing Data

Write Modes

ModeBehavior
overwriteDelete existing data and write fresh
appendAdd new records to existing data
ignoreDo nothing if destination already exists
error (default)Throw an error if destination exists

Write to CSV

df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/output/results")

Write to Parquet

df.write.mode("overwrite").parquet("/output/results.parquet")

Write to a Database (JDBC)

df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/mydb") \
    .option("dbtable", "processed_orders") \
    .option("user", "admin") \
    .option("password", "secret") \
    .mode("append") \
    .save()

Partitioned Writes — Organizing Output Files

Partitioned writes split output files by the value of one or more columns. This makes future reads much faster because Spark reads only the relevant partition folders instead of the entire dataset.

# Write sales data partitioned by year and region
df.write \
    .partitionBy("year", "region") \
    .mode("overwrite") \
    .parquet("/output/sales_partitioned")

# Resulting folder structure:
# /output/sales_partitioned/
#     year=2023/
#         region=North/part-00000.parquet
#         region=South/part-00001.parquet
#     year=2024/
#         region=North/part-00002.parquet
#         region=South/part-00003.parquet

# Future query reads only 2024/North — skips everything else
df_2024_north = spark.read.parquet("/output/sales_partitioned") \
    .filter("year = 2024 AND region = 'North'")

Reading from Amazon S3

# Set AWS credentials (or use IAM role in production)
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_KEY")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET")

df = spark.read.parquet("s3a://my-bucket/data/sales/")
df.write.mode("overwrite").parquet("s3a://my-bucket/output/results/")

Best Practices

  • Use Parquet as the default storage format — it is columnar, compressed, and fast
  • Always define a schema explicitly in production — never rely on inferSchema
  • Use partitionBy on columns you filter often (date, region, category)
  • Avoid writing thousands of small files — use coalesce() before writing

Leave a Comment