Spark SQL and Queries

Spark SQL lets you query distributed data using standard SQL syntax. If you know SQL, you already know most of what you need. Spark SQL runs on top of DataFrames, so you can mix DataFrame code and SQL in the same program.

How Spark SQL Works

Your SQL query (text)
        |
        v
  [ Spark SQL Parser ]
        |
        v
  [ Catalyst Optimizer ]  -- rewrites query for efficiency
        |
        v
  [ Physical Plan ]       -- actual execution steps
        |
        v
  [ Executors run it ]    -- distributed across the cluster
        |
        v
  Result DataFrame

Registering a DataFrame as a Temporary View

Before running SQL, you register a DataFrame as a named temporary view. This view exists only for the current Spark session — it disappears when the session ends.

# Load some data
df = spark.read.csv("sales.csv", header=True, inferSchema=True)

# Register as a temp view (like giving it a table name)
df.createOrReplaceTempView("sales")

# Now query it with SQL
result = spark.sql("SELECT * FROM sales LIMIT 5")
result.show()

Basic SQL Queries

# SELECT and WHERE
spark.sql("""
    SELECT product, revenue
    FROM sales
    WHERE revenue > 10000
""").show()

# GROUP BY and ORDER BY
spark.sql("""
    SELECT region, SUM(revenue) AS total_revenue
    FROM sales
    GROUP BY region
    ORDER BY total_revenue DESC
""").show()

# COUNT with GROUP BY
spark.sql("""
    SELECT category, COUNT(*) AS num_orders
    FROM sales
    GROUP BY category
""").show()

Joins in Spark SQL

# Create two tables
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
customers = spark.read.csv("customers.csv", header=True, inferSchema=True)

orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")

# Inner join
spark.sql("""
    SELECT o.order_id, c.name, o.amount
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
""").show()

# Left join
spark.sql("""
    SELECT c.name, o.amount
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id
""").show()

SQL vs DataFrame API — Same Result

SQL and the DataFrame API produce identical results. Choose based on what you prefer. Many teams use SQL for queries and DataFrame API for programmatic transformations.

# SQL approach
result_sql = spark.sql("SELECT region, SUM(revenue) FROM sales GROUP BY region")

# DataFrame approach (same result)
from pyspark.sql.functions import sum as spark_sum
result_df = df.groupBy("region").agg(spark_sum("revenue"))

# Both produce the same output
result_sql.show()
result_df.show()

Subqueries and Window Functions

Subquery

spark.sql("""
    SELECT *
    FROM sales
    WHERE revenue > (SELECT AVG(revenue) FROM sales)
""").show()

Window Function — Rank Within Each Group

spark.sql("""
    SELECT
        product,
        region,
        revenue,
        RANK() OVER (PARTITION BY region ORDER BY revenue DESC) AS rank
    FROM sales
""").show()

# Output shows rank of each product within its region:
# +---------+------+-------+----+
# | product |region|revenue|rank|
# +---------+------+-------+----+
# |Product A| North|  50000|   1|
# |Product B| North|  30000|   2|
# |Product C| South|  70000|   1|
# +---------+------+-------+----+

Global Temporary Views

Regular temp views exist per session. A global temp view survives across multiple sessions within the same Spark application.

# Create global temp view
df.createOrReplaceGlobalTempView("global_sales")

# Query it — note the global_temp prefix
spark.sql("SELECT * FROM global_temp.global_sales").show()

Built-In SQL Functions

spark.sql("""
    SELECT
        UPPER(product)          AS product_upper,
        ROUND(revenue, 2)       AS revenue_rounded,
        DATE_FORMAT(order_date, 'yyyy-MM') AS month,
        COALESCE(discount, 0)   AS discount_filled
    FROM sales
""").show()

Saving SQL Results

result = spark.sql("SELECT region, SUM(revenue) AS total FROM sales GROUP BY region")

# Save as Parquet (recommended format)
result.write.mode("overwrite").parquet("/output/region_totals")

# Save as CSV
result.write.mode("overwrite").csv("/output/region_totals_csv", header=True)

Leave a Comment