Spark SQL and Queries
Spark SQL lets you query distributed data using standard SQL syntax. If you know SQL, you already know most of what you need. Spark SQL runs on top of DataFrames, so you can mix DataFrame code and SQL in the same program.
How Spark SQL Works
Your SQL query (text)
|
v
[ Spark SQL Parser ]
|
v
[ Catalyst Optimizer ] -- rewrites query for efficiency
|
v
[ Physical Plan ] -- actual execution steps
|
v
[ Executors run it ] -- distributed across the cluster
|
v
Result DataFrame
Registering a DataFrame as a Temporary View
Before running SQL, you register a DataFrame as a named temporary view. This view exists only for the current Spark session — it disappears when the session ends.
# Load some data
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
# Register as a temp view (like giving it a table name)
df.createOrReplaceTempView("sales")
# Now query it with SQL
result = spark.sql("SELECT * FROM sales LIMIT 5")
result.show()
Basic SQL Queries
# SELECT and WHERE
spark.sql("""
SELECT product, revenue
FROM sales
WHERE revenue > 10000
""").show()
# GROUP BY and ORDER BY
spark.sql("""
SELECT region, SUM(revenue) AS total_revenue
FROM sales
GROUP BY region
ORDER BY total_revenue DESC
""").show()
# COUNT with GROUP BY
spark.sql("""
SELECT category, COUNT(*) AS num_orders
FROM sales
GROUP BY category
""").show()
Joins in Spark SQL
# Create two tables
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")
# Inner join
spark.sql("""
SELECT o.order_id, c.name, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
""").show()
# Left join
spark.sql("""
SELECT c.name, o.amount
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
""").show()
SQL vs DataFrame API — Same Result
SQL and the DataFrame API produce identical results. Choose based on what you prefer. Many teams use SQL for queries and DataFrame API for programmatic transformations.
# SQL approach
result_sql = spark.sql("SELECT region, SUM(revenue) FROM sales GROUP BY region")
# DataFrame approach (same result)
from pyspark.sql.functions import sum as spark_sum
result_df = df.groupBy("region").agg(spark_sum("revenue"))
# Both produce the same output
result_sql.show()
result_df.show()
Subqueries and Window Functions
Subquery
spark.sql("""
SELECT *
FROM sales
WHERE revenue > (SELECT AVG(revenue) FROM sales)
""").show()
Window Function — Rank Within Each Group
spark.sql("""
SELECT
product,
region,
revenue,
RANK() OVER (PARTITION BY region ORDER BY revenue DESC) AS rank
FROM sales
""").show()
# Output shows rank of each product within its region:
# +---------+------+-------+----+
# | product |region|revenue|rank|
# +---------+------+-------+----+
# |Product A| North| 50000| 1|
# |Product B| North| 30000| 2|
# |Product C| South| 70000| 1|
# +---------+------+-------+----+
Global Temporary Views
Regular temp views exist per session. A global temp view survives across multiple sessions within the same Spark application.
# Create global temp view
df.createOrReplaceGlobalTempView("global_sales")
# Query it — note the global_temp prefix
spark.sql("SELECT * FROM global_temp.global_sales").show()
Built-In SQL Functions
spark.sql("""
SELECT
UPPER(product) AS product_upper,
ROUND(revenue, 2) AS revenue_rounded,
DATE_FORMAT(order_date, 'yyyy-MM') AS month,
COALESCE(discount, 0) AS discount_filled
FROM sales
""").show()
Saving SQL Results
result = spark.sql("SELECT region, SUM(revenue) AS total FROM sales GROUP BY region")
# Save as Parquet (recommended format)
result.write.mode("overwrite").parquet("/output/region_totals")
# Save as CSV
result.write.mode("overwrite").csv("/output/region_totals_csv", header=True)
