Snowflake Streams and Tasks for Data Pipelines

Streams and Tasks are Snowflake's native tools for building automated data pipelines. A Stream tracks every change made to a table — every INSERT, UPDATE, and DELETE — as a log of change records. A Task executes a SQL statement or stored procedure on a schedule or in response to a trigger. Together, they let you build continuous data pipelines entirely inside Snowflake without external orchestration tools like Airflow or Prefect.

What Is a Stream?

A Stream sits on top of a table and records every row-level change that happens to that table after the stream was created. Think of it as a CCTV camera pointed at a table — it watches every INSERT, UPDATE, and DELETE, and writes a timestamped record of each change into the stream. When you consume those records (read them into a downstream table), the stream resets and begins recording only new changes from that point forward.

STREAM CONCEPT DIAGRAM
========================

SOURCE TABLE: ORDERS_RAW
  (rows arrive continuously from application)

STREAM: orders_stream (watching ORDERS_RAW)
  Records all changes since last consumption:
    +------------------------------------------------+
    | METADATA$ACTION | METADATA$ISUPDATE | row data |
    |-----------------|-------------------|----------|
    | INSERT          | FALSE             | order 101|
    | INSERT          | FALSE             | order 102|
    | DELETE          | FALSE             | order 099|  (deleted row)
    | INSERT          | TRUE              | order 099|  (updated = delete+insert)
    | INSERT          | FALSE             | order 103|
    +------------------------------------------------+

TASK runs every 5 minutes:
  Reads stream records → transforms → writes to ORDERS_CLEAN
  Stream resets → ready to capture next batch of changes

This pattern is called Change Data Capture (CDC)

Creating Streams

USE DATABASE RETAIL_DB;
USE SCHEMA RAW_DATA;

-- Standard stream: tracks INSERT, UPDATE, DELETE
CREATE STREAM orders_stream
  ON TABLE ORDERS_RAW
  COMMENT = 'Captures all changes to ORDERS_RAW for incremental processing';

-- Append-only stream (more efficient for insert-only tables)
-- Only tracks INSERTs, ignores UPDATE and DELETE
CREATE STREAM events_stream
  ON TABLE WEB_EVENTS
  APPEND_ONLY = TRUE
  COMMENT = 'Tracks new web events only — no updates or deletes expected';

-- Stream on a view (Enterprise edition)
CREATE STREAM orders_view_stream
  ON VIEW VW_ORDERS_ENRICHED;

-- Check stream status and offset
SHOW STREAMS;

-- See how many rows are waiting in a stream
SELECT SYSTEM$STREAM_HAS_DATA('orders_stream');
-- Returns TRUE if there are unconsumed changes, FALSE if stream is empty

Reading from a Stream

You query a stream just like a table. Stream records include three metadata columns that describe what happened to each row.

-- View the stream's current change records
SELECT *
FROM orders_stream;

-- Output columns:
-- ORDER_ID | CUSTOMER_ID | AMOUNT | ... | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID
--      101 |        5001 |  349.0 | ... | INSERT          | FALSE             | abc123...
--      102 |        5002 |  189.0 | ... | INSERT          | FALSE             | def456...
--      099 |        4998 |  220.0 | ... | DELETE          | FALSE             | xyz789...
--      099 |        4998 |  225.0 | ... | INSERT          | TRUE              | xyz789...
--                                         ^ updated row: shows new values; ISUPDATE=TRUE

-- METADATA$ACTION:    'INSERT' or 'DELETE'
-- METADATA$ISUPDATE:  TRUE means this INSERT is the new version of an updated row
-- METADATA$ROW_ID:    Unique identifier for the row being changed

-- Filter only new inserts (not updates or deletes)
SELECT *
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
  AND METADATA$ISUPDATE = FALSE;

-- Filter only updates (new values of updated rows)
SELECT *
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
  AND METADATA$ISUPDATE = TRUE;

-- Filter only deletes
SELECT *
FROM orders_stream
WHERE METADATA$ACTION = 'DELETE';

Consuming a Stream with MERGE

The standard pattern for consuming stream data is to use a MERGE statement — it handles inserts, updates, and deletes in one atomic operation. The MERGE reads from the stream and applies changes to a downstream target table.

-- Target clean table
CREATE TABLE ORDERS_CLEAN (
    order_id      INT PRIMARY KEY,
    customer_id   INT,
    amount        DECIMAL(10,2),
    status        VARCHAR(30),
    order_date    DATE,
    last_updated  TIMESTAMP_NTZ
);

-- MERGE from stream to target (handles insert + update + delete)
MERGE INTO ORDERS_CLEAN AS target
USING (
    SELECT
        order_id,
        customer_id,
        amount,
        status,
        order_date,
        CURRENT_TIMESTAMP()   AS last_updated,
        METADATA$ACTION       AS action,
        METADATA$ISUPDATE     AS is_update
    FROM orders_stream
) AS src
ON target.order_id = src.order_id
WHEN MATCHED AND src.action = 'DELETE'
    THEN DELETE
WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
    THEN UPDATE SET
        target.amount       = src.amount,
        target.status       = src.status,
        target.last_updated = src.last_updated
WHEN NOT MATCHED AND src.action = 'INSERT'
    THEN INSERT (order_id, customer_id, amount, status, order_date, last_updated)
    VALUES (src.order_id, src.customer_id, src.amount, src.status, src.order_date, src.last_updated);

-- After this MERGE runs, the stream resets — those records are consumed
-- Next MERGE will process only changes that occurred after this run

What Is a Task?

A Task runs a SQL statement (or a stored procedure call) on a schedule you define — every minute, every hour, every day, or using a cron expression. Tasks are Snowflake's built-in scheduler. Instead of running a Python script in Airflow to trigger Snowflake queries, a Task executes the SQL directly inside Snowflake at the scheduled time, using a warehouse you specify.

-- Create a task that runs the MERGE every 5 minutes
CREATE TASK load_orders_clean_task
  WAREHOUSE     = ETL_WH
  SCHEDULE      = '5 MINUTE'      -- run every 5 minutes
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')  -- only run if stream has data
AS
MERGE INTO ORDERS_CLEAN AS target
USING (
    SELECT order_id, customer_id, amount, status, order_date,
           CURRENT_TIMESTAMP() AS last_updated,
           METADATA$ACTION AS action, METADATA$ISUPDATE AS is_update
    FROM orders_stream
) AS src
ON target.order_id = src.order_id
WHEN MATCHED AND src.action = 'DELETE' THEN DELETE
WHEN MATCHED AND src.action = 'INSERT' AND src.is_update = TRUE
    THEN UPDATE SET target.amount = src.amount, target.status = src.status,
                   target.last_updated = src.last_updated
WHEN NOT MATCHED AND src.action = 'INSERT'
    THEN INSERT VALUES (src.order_id, src.customer_id, src.amount,
                       src.status, src.order_date, src.last_updated);

-- Task created but starts SUSPENDED — must resume it explicitly
ALTER TASK load_orders_clean_task RESUME;

-- Suspend a task
ALTER TASK load_orders_clean_task SUSPEND;

-- Run a task manually (for testing)
EXECUTE TASK load_orders_clean_task;

Task Scheduling Options

SCHEDULE SYNTAX           MEANING
---------------           -------
'1 MINUTE'                Every 1 minute
'5 MINUTE'                Every 5 minutes
'60 MINUTE'               Every 60 minutes (= every hour)
'1 DAY'                   Once every 24 hours

CRON SYNTAX (more precise):
'USING CRON 0 6 * * * UTC'          Daily at 6:00 AM UTC
'USING CRON 0 */4 * * * UTC'        Every 4 hours
'USING CRON 30 8 * * MON-FRI UTC'   Weekdays at 8:30 AM UTC
'USING CRON 0 0 1 * * UTC'          First day of each month at midnight

FORMAT: USING CRON [minute] [hour] [day] [month] [weekday] [timezone]

Task DAGs: Chaining Tasks into Pipelines

Tasks can be chained into a Directed Acyclic Graph (DAG). A root task runs on a schedule. When it completes, it triggers one or more child tasks. Child tasks trigger their own children, creating a tree of dependent steps that form a complete pipeline.

TASK DAG EXAMPLE: Nightly ETL Pipeline
========================================

PIPELINE STEPS:
  1. Load raw data from stage into ORDERS_RAW
  2. Clean and validate into ORDERS_CLEAN
  3. Build dimension tables (DIM_CUSTOMER, DIM_PRODUCT)
  4. Build fact table FACT_SALES
  5. Refresh reporting aggregates

DAG STRUCTURE:
  [ROOT TASK] nightly_root_task       -- runs at 2:00 AM via CRON
       |
       v
  [Task 2] load_raw_task              -- triggered after root completes
       |
       v
  [Task 3a] build_dim_customer_task   -- triggered after load_raw completes
  [Task 3b] build_dim_product_task    -- triggered simultaneously with 3a
       |                 |
       v                 v
  [Task 4] build_fact_sales_task      -- waits for BOTH 3a AND 3b to complete
       |
       v
  [Task 5] refresh_reports_task       -- final step

-- SQL to build this DAG:
CREATE TASK nightly_root_task
  WAREHOUSE = ETL_WH
  SCHEDULE  = 'USING CRON 0 2 * * * UTC'
AS SELECT 1;   -- root task just triggers the chain; real work in children

CREATE TASK load_raw_task
  WAREHOUSE = ETL_WH
  AFTER nightly_root_task     -- triggers after root
AS CALL sp_load_raw_orders();

CREATE TASK build_dim_customer_task
  WAREHOUSE = ETL_WH
  AFTER load_raw_task
AS CALL sp_build_dim_customer();

CREATE TASK build_dim_product_task
  WAREHOUSE = ETL_WH
  AFTER load_raw_task
AS CALL sp_build_dim_product();

CREATE TASK build_fact_sales_task
  WAREHOUSE = ETL_WH
  AFTER build_dim_customer_task, build_dim_product_task  -- waits for both
AS CALL sp_build_fact_sales();

CREATE TASK refresh_reports_task
  WAREHOUSE = ETL_WH
  AFTER build_fact_sales_task
AS CALL sp_refresh_reports();

-- Resume all tasks in the DAG (resume root last)
ALTER TASK refresh_reports_task RESUME;
ALTER TASK build_fact_sales_task RESUME;
ALTER TASK build_dim_product_task RESUME;
ALTER TASK build_dim_customer_task RESUME;
ALTER TASK load_raw_task RESUME;
ALTER TASK nightly_root_task RESUME;  -- resume root last to activate the DAG

Monitoring Streams and Tasks

-- View all streams and their status
SHOW STREAMS;

-- Check task status and schedule
SHOW TASKS;

-- View task run history (success, failure, duration)
SELECT *
FROM TABLE(SNOWFLAKE.INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START => DATEADD('hour', -24, CURRENT_TIMESTAMP()),
    RESULT_LIMIT               => 100
))
ORDER BY scheduled_time DESC;

-- Output columns:
-- NAME | STATE | SCHEDULED_TIME | QUERY_START_TIME | COMPLETED_TIME | ERROR_MESSAGE

-- Find failed tasks in the last 7 days
SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY
WHERE state = 'FAILED'
  AND scheduled_time >= DATEADD('day', -7, CURRENT_TIMESTAMP())
ORDER BY scheduled_time DESC;

Key Points

  • Streams record every INSERT, UPDATE, and DELETE on a source table as change data capture (CDC) records with metadata columns describing the action
  • Append-only streams are more efficient for insert-only tables (event logs, click streams) because they skip update and delete tracking overhead
  • Consuming a stream with MERGE handles all three change types (insert, update, delete) in a single atomic SQL statement
  • After a stream is consumed in a DML transaction, it resets and captures only new changes — there are no duplicate records in the next batch
  • Tasks run SQL on a fixed schedule or cron expression; use WHEN SYSTEM$STREAM_HAS_DATA() to skip runs when no new data arrived
  • Task DAGs chain multiple tasks into a dependency tree — child tasks trigger automatically when parent tasks complete successfully
  • Monitor task run history with TASK_HISTORY in SNOWFLAKE.ACCOUNT_USAGE to detect failures and measure pipeline latency

Leave a Comment

Your email address will not be published. Required fields are marked *