GCP Cloud Dataflow

Cloud Dataflow is GCP's fully managed data processing service for running Apache Beam pipelines. It handles both real-time stream processing (data arriving continuously, like clicks or sensor readings) and batch processing (large historical datasets processed all at once) using the same programming model. Dataflow automatically provisions, scales, and manages the underlying compute resources.

Imagine a water treatment plant. Water (data) flows in continuously from rivers (Pub/Sub streams) and stored tanks (Cloud Storage files). The treatment plant (Dataflow pipeline) filters, cleans, and transforms the water, then delivers it to homes (BigQuery, Bigtable). The plant adjusts its capacity automatically when more water arrives.

Stream vs Batch Processing

ConceptBatch ProcessingStream Processing
Data arrivalAll at once (files, database exports)Continuously (events, messages, logs)
LatencyMinutes to hoursMilliseconds to seconds
Use caseNightly sales reports, ETL jobsReal-time fraud detection, live dashboards
GCP sourceCloud Storage, BigQueryPub/Sub, Kafka
GCP destinationBigQuery, Cloud StorageBigQuery, Bigtable, Cloud Storage

Apache Beam and Dataflow

Dataflow runs Apache Beam pipelines. Apache Beam is an open-source SDK for defining data processing pipelines. The same Beam code runs on Dataflow, Apache Spark, or locally — the execution engine is swappable. Beam supports Python, Java, and Go.

Apache Beam Pipeline Concepts:
┌───────────────────────────────────────────────────────────────────┐
│                                                                   │
│  PCollection (distributed dataset — like a stream or batch)       │
│                                                                   │
│   Source ──▶ PCollection ──▶ Transform ──▶ PCollection ──▶ Sink   │
│                                                                   │
│  Source:     Read from Cloud Storage / Pub/Sub / BigQuery         │
│  Transform:  Map, Filter, GroupBy, Combine, Window                │
│  Sink:       Write to BigQuery / Cloud Storage / Bigtable         │
│                                                                   │
└───────────────────────────────────────────────────────────────────┘

Building a Simple Batch Pipeline

This pipeline reads a CSV file from Cloud Storage, counts word frequencies, and writes results to BigQuery.

# requirements.txt
apache-beam[gcp]==2.53.0
# word_count_pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(
    runner="DataflowRunner",
    project="my-project",
    region="us-central1",
    temp_location="gs://my-bucket/temp",
    staging_location="gs://my-bucket/staging",
    job_name="word-count-job"
)

with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        # Read text file from Cloud Storage
        | "Read File"     >> beam.io.ReadFromText("gs://my-bucket/input/text.txt")

        # Split each line into individual words
        | "Split Words"   >> beam.FlatMap(lambda line: line.lower().split())

        # Filter out empty strings
        | "Filter Empty"  >> beam.Filter(lambda word: len(word) > 0)

        # Count each word
        | "Count Words"   >> beam.combiners.Count.PerElement()

        # Format as dictionary for BigQuery
        | "Format Row"    >> beam.Map(lambda item: {"word": item[0], "count": item[1]})

        # Write to BigQuery
        | "Write BQ"      >> beam.io.WriteToBigQuery(
            "my-project:word_counts.results",
            schema="word:STRING, count:INTEGER",
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
    )
# Submit the pipeline to Dataflow
python word_count_pipeline.py

Building a Streaming Pipeline (Pub/Sub to BigQuery)

This pipeline reads real-time events from a Pub/Sub topic, parses JSON messages, filters invalid ones, and writes to BigQuery.

# streaming_pipeline.py
import apache_beam as beam
import json
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

options = PipelineOptions(
    runner="DataflowRunner",
    project="my-project",
    region="us-central1",
    temp_location="gs://my-bucket/temp",
    job_name="events-streaming-job"
)
options.view_as(StandardOptions).streaming = True  # Enable streaming mode

def parse_message(message):
    """Parse a Pub/Sub message (bytes) into a Python dict."""
    try:
        return json.loads(message.decode("utf-8"))
    except Exception:
        return None

def is_valid(event):
    """Filter out None values and events missing required fields."""
    return event is not None and "user_id" in event and "event_type" in event

with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        # Read from Pub/Sub topic
        | "Read Pub/Sub"    >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/events")

        # Parse JSON bytes into dict
        | "Parse JSON"      >> beam.Map(parse_message)

        # Filter invalid events
        | "Filter Valid"    >> beam.Filter(is_valid)

        # Write to BigQuery (streaming insert)
        | "Write BigQuery"  >> beam.io.WriteToBigQuery(
            "my-project:analytics.events",
            schema="user_id:STRING, event_type:STRING, timestamp:TIMESTAMP",
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )
    )

Windowing in Stream Processing

In stream processing, data arrives continuously. Windowing divides the infinite stream into finite time buckets for aggregation.

Stream of events:  --e1--e2--e3--e4--e5--e6--e7--e8--e9-->

Fixed Window (every 1 minute):
Window 1: [e1,e2,e3]  →  count=3
Window 2: [e4,e5,e6]  →  count=3
Window 3: [e7,e8,e9]  →  count=3

Sliding Window (1 min window, slides every 30 seconds):
Window 1: [e1,e2,e3,e4]        →  count=4
Window 2:       [e2,e3,e4,e5]  →  count=4
Window 3:              [e4,e5,e6,e7] → count=4

Session Window (groups events within 5-min gap):
User A: [e1,e2,e3]──5min gap──[e7,e8] → Two sessions
# Apply a fixed 1-minute window to a streaming PCollection
windowed = (
    events
    | "Window" >> beam.WindowInto(beam.window.FixedWindows(60))  # 60 seconds
    | "Count per window" >> beam.combiners.Count.PerElement()
)

Dataflow Monitoring

The Dataflow console provides a visual pipeline graph where each transform is shown as a node. Metrics like elements processed, throughput, and system lag are visible in real time.

Dataflow Job Graph (visual view in Console):
Read Pub/Sub
     │  250,000 elements/sec
     ▼
Parse JSON
     │  245,000 elements/sec (5,000 filtered)
     ▼
Filter Valid
     │  244,500 elements/sec
     ▼
Write BigQuery
     │  244,500 rows/sec inserted

Dataflow Templates

GCP provides pre-built Dataflow templates for common use cases — no coding required.

TemplateDescription
Pub/Sub to BigQueryStream Pub/Sub messages into a BigQuery table
Cloud Storage to BigQueryLoad CSV/JSON files from GCS into BigQuery
BigQuery to Cloud StorageExport BigQuery query results to GCS files
Cloud Storage Text to Cloud Pub/SubRead text files and publish to a Pub/Sub topic
Datastream to BigQueryReplicate database changes to BigQuery in real time
# Launch a Pub/Sub to BigQuery template (no code needed)
gcloud dataflow jobs run pubsub-to-bq \
  --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
  --region us-central1 \
  --parameters \
    inputTopic=projects/my-project/topics/events,\
    outputTableSpec=my-project:analytics.events

Key Takeaways

  • Cloud Dataflow runs Apache Beam pipelines for both batch and stream data processing.
  • The same Beam code handles both bounded (batch) and unbounded (streaming) data.
  • Dataflow auto-scales workers up and down based on data volume.
  • Windowing divides continuous streams into time buckets for aggregation.
  • Pre-built Dataflow templates enable common ETL pipelines without writing any code.
  • Typical pipeline: Source (Pub/Sub / GCS) → Transform (filter, parse, enrich) → Sink (BigQuery / Bigtable).

Leave a Comment