Snowflake Streams and Tasks for Data Pipelines
Streams and Tasks are Snowflake's native tools for building automated data pipelines. A Stream tracks every change made to a table — every INSERT, UPDATE, and DELETE — as a log of change records. A Task executes a SQL statement or stored procedure on a schedule or in response to a trigger. Together, they let you build continuous data pipelines entirely inside Snowflake without external orchestration tools like Airflow or Prefect.
What Is a Stream?
A Stream sits on top of a table and records every row-level change that happens to that table after the stream was created. Think of it as a CCTV camera pointed at a table — it watches every INSERT, UPDATE, and DELETE, and writes a timestamped record of each change into the stream. When you consume those records (read them into a downstream table), the stream resets and begins recording only new changes from that point forward.
STREAM CONCEPT DIAGRAM
========================
SOURCE TABLE: ORDERS_RAW
(rows arrive continuously from application)
STREAM: orders_stream (watching ORDERS_RAW)
Records all changes since last consumption:
+------------------------------------------------+
| METADATA$ACTION | METADATA$ISUPDATE | row data |
|-----------------|-------------------|----------|
| INSERT | FALSE | order 101|
| INSERT | FALSE | order 102|
| DELETE | FALSE | order 099| (deleted row)
| INSERT | TRUE | order 099| (updated = delete+insert)
| INSERT | FALSE | order 103|
+------------------------------------------------+
TASK runs every 5 minutes:
Reads stream records → transforms → writes to ORDERS_CLEAN
Stream resets → ready to capture next batch of changes
This pattern is called Change Data Capture (CDC)
Creating Streams
USE DATABASE RETAIL_DB;
USE SCHEMA RAW_DATA;
-- Standard stream: tracks INSERT, UPDATE, DELETE
CREATE STREAM orders_stream
ON TABLE ORDERS_RAW
COMMENT = 'Captures all changes to ORDERS_RAW for incremental processing';
-- Append-only stream (more efficient for insert-only tables)
-- Only tracks INSERTs, ignores UPDATE and DELETE
CREATE STREAM events_stream
ON TABLE WEB_EVENTS
APPEND_ONLY = TRUE
COMMENT = 'Tracks new web events only — no updates or deletes expected';
-- Stream on a view (Enterprise edition)
CREATE STREAM orders_view_stream
ON VIEW VW_ORDERS_ENRICHED;
-- Check stream status and offset
SHOW STREAMS;
-- See how many rows are waiting in a stream
SELECT SYSTEM$STREAM_HAS_DATA('orders_stream');
-- Returns TRUE if there are unconsumed changes, FALSE if stream is empty
Reading from a Stream
You query a stream just like a table. Stream records include three metadata columns that describe what happened to each row.
-- View the stream's current change records SELECT * FROM orders_stream; -- Output columns: -- ORDER_ID | CUSTOMER_ID | AMOUNT | ... | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID -- 101 | 5001 | 349.0 | ... | INSERT | FALSE | abc123... -- 102 | 5002 | 189.0 | ... | INSERT | FALSE | def456... -- 099 | 4998 | 220.0 | ... | DELETE | FALSE | xyz789... -- 099 | 4998 | 225.0 | ... | INSERT | TRUE | xyz789... -- ^ updated row: shows new values; ISUPDATE=TRUE -- METADATA$ACTION: 'INSERT' or 'DELETE' -- METADATA$ISUPDATE: TRUE means this INSERT is the new version of an updated row -- METADATA$ROW_ID: Unique identifier for the row being changed -- Filter only new inserts (not updates or deletes) SELECT * FROM orders_stream WHERE METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = FALSE; -- Filter only updates (new values of updated rows) SELECT * FROM orders_stream WHERE METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = TRUE; -- Filter only deletes SELECT * FROM orders_stream WHERE METADATA$ACTION = 'DELETE';
Consuming a Stream with MERGE
The standard pattern for consuming stream data is to use a MERGE statement — it handles inserts, updates, and deletes in one atomic operation. The MERGE reads from the stream and applies changes to a downstream target table.
-- Target clean table
CREATE TABLE ORDERS_CLEAN (
order_id INT PRIMARY KEY,
customer_id INT,
amount DECIMAL(10,2),
status VARCHAR(30),
order_date DATE,
last_updated TIMESTAMP_NTZ
);
-- MERGE from stream to target (handles insert + update + delete)
MERGE INTO ORDERS_CLEAN AS target
USING (
SELECT
order_id,
customer_id,
amount,
status,
order_date,
CURRENT_TIMESTAMP() AS last_updated,
METADATA$ACTION AS action,
METADATA$ISUPDATE AS is_update
FROM orders_stream
) AS src
ON target.order_id = src.order_id
WHEN MATCHED AND src.action = 'DELETE'
THEN DELETE
WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
THEN UPDATE SET
target.amount = src.amount,
target.status = src.status,
target.last_updated = src.last_updated
WHEN NOT MATCHED AND src.action = 'INSERT'
THEN INSERT (order_id, customer_id, amount, status, order_date, last_updated)
VALUES (src.order_id, src.customer_id, src.amount, src.status, src.order_date, src.last_updated);
-- After this MERGE runs, the stream resets — those records are consumed
-- Next MERGE will process only changes that occurred after this run
What Is a Task?
A Task runs a SQL statement (or a stored procedure call) on a schedule you define — every minute, every hour, every day, or using a cron expression. Tasks are Snowflake's built-in scheduler. Instead of running a Python script in Airflow to trigger Snowflake queries, a Task executes the SQL directly inside Snowflake at the scheduled time, using a warehouse you specify.
-- Create a task that runs the MERGE every 5 minutes
CREATE TASK load_orders_clean_task
WAREHOUSE = ETL_WH
SCHEDULE = '5 MINUTE' -- run every 5 minutes
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream') -- only run if stream has data
AS
MERGE INTO ORDERS_CLEAN AS target
USING (
SELECT order_id, customer_id, amount, status, order_date,
CURRENT_TIMESTAMP() AS last_updated,
METADATA$ACTION AS action, METADATA$ISUPDATE AS is_update
FROM orders_stream
) AS src
ON target.order_id = src.order_id
WHEN MATCHED AND src.action = 'DELETE' THEN DELETE
WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
THEN UPDATE SET target.amount = src.amount, target.status = src.status,
target.last_updated = src.last_updated
WHEN NOT MATCHED AND src.action = 'INSERT'
THEN INSERT VALUES (src.order_id, src.customer_id, src.amount,
src.status, src.order_date, src.last_updated);
-- Task created but starts SUSPENDED — must resume it explicitly
ALTER TASK load_orders_clean_task RESUME;
-- Suspend a task
ALTER TASK load_orders_clean_task SUSPEND;
-- Run a task manually (for testing)
EXECUTE TASK load_orders_clean_task;
Task Scheduling Options
SCHEDULE SYNTAX MEANING --------------- ------- '1 MINUTE' Every 1 minute '5 MINUTE' Every 5 minutes '60 MINUTE' Every 60 minutes (= every hour) '1 DAY' Once every 24 hours CRON SYNTAX (more precise): 'USING CRON 0 6 * * * UTC' Daily at 6:00 AM UTC 'USING CRON 0 */4 * * * UTC' Every 4 hours 'USING CRON 30 8 * * MON-FRI UTC' Weekdays at 8:30 AM UTC 'USING CRON 0 0 1 * * UTC' First day of each month at midnight FORMAT: USING CRON [minute] [hour] [day] [month] [weekday] [timezone]
Task DAGs: Chaining Tasks into Pipelines
Tasks can be chained into a Directed Acyclic Graph (DAG). A root task runs on a schedule. When it completes, it triggers one or more child tasks. Child tasks trigger their own children, creating a tree of dependent steps that form a complete pipeline.
TASK DAG EXAMPLE: Nightly ETL Pipeline
========================================
PIPELINE STEPS:
1. Load raw data from stage into ORDERS_RAW
2. Clean and validate into ORDERS_CLEAN
3. Build dimension tables (DIM_CUSTOMER, DIM_PRODUCT)
4. Build fact table FACT_SALES
5. Refresh reporting aggregates
DAG STRUCTURE:
[ROOT TASK] nightly_root_task -- runs at 2:00 AM via CRON
|
v
[Task 2] load_raw_task -- triggered after root completes
|
v
[Task 3a] build_dim_customer_task -- triggered after load_raw completes
[Task 3b] build_dim_product_task -- triggered simultaneously with 3a
| |
v v
[Task 4] build_fact_sales_task -- waits for BOTH 3a AND 3b to complete
|
v
[Task 5] refresh_reports_task -- final step
-- SQL to build this DAG:
CREATE TASK nightly_root_task
WAREHOUSE = ETL_WH
SCHEDULE = 'USING CRON 0 2 * * * UTC'
AS SELECT 1; -- root task just triggers the chain; real work in children
CREATE TASK load_raw_task
WAREHOUSE = ETL_WH
AFTER nightly_root_task -- triggers after root
AS CALL sp_load_raw_orders();
CREATE TASK build_dim_customer_task
WAREHOUSE = ETL_WH
AFTER load_raw_task
AS CALL sp_build_dim_customer();
CREATE TASK build_dim_product_task
WAREHOUSE = ETL_WH
AFTER load_raw_task
AS CALL sp_build_dim_product();
CREATE TASK build_fact_sales_task
WAREHOUSE = ETL_WH
AFTER build_dim_customer_task, build_dim_product_task -- waits for both
AS CALL sp_build_fact_sales();
CREATE TASK refresh_reports_task
WAREHOUSE = ETL_WH
AFTER build_fact_sales_task
AS CALL sp_refresh_reports();
-- Resume all tasks in the DAG (resume root last)
ALTER TASK refresh_reports_task RESUME;
ALTER TASK build_fact_sales_task RESUME;
ALTER TASK build_dim_product_task RESUME;
ALTER TASK build_dim_customer_task RESUME;
ALTER TASK load_raw_task RESUME;
ALTER TASK nightly_root_task RESUME; -- resume root last to activate the DAG
Monitoring Streams and Tasks
-- View all streams and their status
SHOW STREAMS;
-- Check task status and schedule
SHOW TASKS;
-- View task run history (success, failure, duration)
SELECT *
FROM TABLE(SNOWFLAKE.INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD('hour', -24, CURRENT_TIMESTAMP()),
RESULT_LIMIT => 100
))
ORDER BY scheduled_time DESC;
-- Output columns:
-- NAME | STATE | SCHEDULED_TIME | QUERY_START_TIME | COMPLETED_TIME | ERROR_MESSAGE
-- Find failed tasks in the last 7 days
SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY
WHERE state = 'FAILED'
AND scheduled_time >= DATEADD('day', -7, CURRENT_TIMESTAMP())
ORDER BY scheduled_time DESC;
Key Points
- Streams record every INSERT, UPDATE, and DELETE on a source table as change data capture (CDC) records with metadata columns describing the action
- Append-only streams are more efficient for insert-only tables (event logs, click streams) because they skip update and delete tracking overhead
- Consuming a stream with MERGE handles all three change types (insert, update, delete) in a single atomic SQL statement
- After a stream is consumed in a DML transaction, it resets and captures only new changes — there are no duplicate records in the next batch
- Tasks run SQL on a fixed schedule or cron expression; use WHEN SYSTEM$STREAM_HAS_DATA() to skip runs when no new data arrived
- Task DAGs chain multiple tasks into a dependency tree — child tasks trigger automatically when parent tasks complete successfully
- Monitor task run history with TASK_HISTORY in SNOWFLAKE.ACCOUNT_USAGE to detect failures and measure pipeline latency
