Event Hub Sending and Receiving Events

Sending events to Azure Event Hub (producing) and receiving events from it (consuming) are the two primary operations in every Event Hub solution. The Azure SDK for .NET, Java, Python, JavaScript, and Go provides the libraries for both operations. This topic covers the producer pattern, consumer pattern, batching, Event Processor Host, and checkpointing with practical code examples.

SDK Overview

The Azure Event Hubs SDK package is Azure.Messaging.EventHubs for .NET and equivalents for other languages. The SDK has three primary classes:

ClassPurpose
EventHubProducerClientSends (publishes) events to an Event Hub
EventHubConsumerClientReads events from a single partition (low-level; best for testing)
EventProcessorClientReads events from all partitions with automatic load balancing and checkpointing (production recommended)

Sending Events – Producer Pattern

Step 1: Connection Setup

A producer connects to an Event Hub using either a connection string (from Shared Access Policy) or an Azure AD credential. Azure AD managed identity is recommended for production.

Step 2: Create EventData Objects

Each event is represented as an EventData object. The body is a byte array or a string that gets encoded. Optional properties can be added as key-value pairs.

Step 3: Send as a Batch (Recommended)

Batching multiple events into a single send call improves throughput and reduces network overhead. The EventDataBatch class automatically respects the maximum batch size limit.

Producer Code Example (Python)

import asyncio
import json
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

CONNECTION_STRING = "Endpoint=sb://prod-events-ns.servicebus.windows.net/;SharedAccessKeyName=producers-policy;SharedAccessKey=abc123=="
EVENT_HUB_NAME    = "telemetry"

async def send_events():
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STRING,
        eventhub_name=EVENT_HUB_NAME
    )

    async with producer:
        # Create a batch
        event_batch = await producer.create_batch()

        # Add events to the batch
        for i in range(10):
            sensor_reading = {
                "deviceId": f"sensor-{i:02d}",
                "temperature": 72.4 + i,
                "humidity": 55.2,
                "timestamp": "2024-06-15T10:00:00Z"
            }
            # Add event to batch (automatically checks batch size limit)
            event_batch.add(EventData(json.dumps(sensor_reading)))

        # Send the entire batch in one call
        await producer.send_batch(event_batch)
        print(f"Sent batch of {len(event_batch)} events")

asyncio.run(send_events())

Sending with Partition Key

To ensure events for the same device always go to the same partition, set a partition key when creating the batch. Event Grid uses the key's hash to select the partition.

# All events from "sensor-05" always go to the same partition
batch_options = {"partition_key": "sensor-05"}
event_batch = await producer.create_batch(**batch_options)
event_batch.add(EventData(json.dumps(sensor_reading)))
await producer.send_batch(event_batch)

Producer Throughput Guidance

PracticeImpact
Use batching (EventDataBatch)Reduces network calls; increases throughput significantly
Use async/await patternEnables concurrent sends without blocking threads
Reuse ProducerClient instancesCreating a new client per event is expensive; create once and reuse
Handle ServerBusy (429) with retryImplement exponential backoff on throttling errors
Use partition key for ordering needsGuarantees order only within a partition for same-key events

Receiving Events – Consumer Pattern

Option 1: EventHubConsumerClient (Testing and Simple Scenarios)

EventHubConsumerClient reads events from a specific partition. This is useful for testing and simple single-partition scenarios. It does not handle load balancing across multiple consumer instances and is not recommended for production at scale.

Consumer Code Example – Single Partition (Python)

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub import EventPosition

CONNECTION_STRING = "Endpoint=sb://prod-events-ns.servicebus.windows.net/;SharedAccessKeyName=consumers-policy;SharedAccessKey=xyz789=="
EVENT_HUB_NAME    = "telemetry"
CONSUMER_GROUP    = "$Default"

async def receive_events():
    consumer = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STRING,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENT_HUB_NAME
    )

    async def on_event(partition_context, event):
        print(f"Partition: {partition_context.partition_id}")
        print(f"Offset: {event.offset}")
        print(f"Body: {event.body_as_str()}")
        # Update checkpoint after processing
        await partition_context.update_checkpoint(event)

    async with consumer:
        # Receive from all partitions starting from latest events
        await consumer.receive(
            on_event=on_event,
            starting_position=EventPosition.LATEST
        )

asyncio.run(receive_events())

Option 2: EventProcessorClient (Production Recommended)

EventProcessorClient is the production-grade consumer. It distributes partitions automatically across multiple consumer instances, handles checkpointing to Azure Blob Storage, recovers from failures, and rebalances when instances are added or removed.

EventProcessorClient Architecture

Event Hub: "telemetry" (4 partitions)
Consumer Group: "analytics"
Blob Storage: "checkpoints-container"

Running consumer instances: 2

Instance A (host1):
  Manages Partition 0 and Partition 1
  Saves checkpoints for both to Blob Storage

Instance B (host2):
  Manages Partition 2 and Partition 3
  Saves checkpoints for both to Blob Storage

New Instance C (host3) joins:
  EventProcessorClient rebalances automatically:
    Instance A: Partition 0
    Instance B: Partition 2
    Instance C: Partition 1 and Partition 3 (or other distribution)

EventProcessorClient Code Example (Python)

import asyncio
from azure.eventhub.aio import EventProcessorClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

EVENT_HUB_CONN_STR   = "Endpoint=sb://prod-events-ns..."
EVENT_HUB_NAME        = "telemetry"
CONSUMER_GROUP        = "analytics"
STORAGE_CONN_STR      = "DefaultEndpointsProtocol=https;AccountName=mystorage..."
CHECKPOINT_CONTAINER  = "event-hub-checkpoints"

async def on_event(partition_context, event):
    """Called for each received event."""
    body = event.body_as_str()
    print(f"Partition {partition_context.partition_id} | Offset {event.offset}")
    print(f"Event body: {body}")

    # Save checkpoint after successful processing
    await partition_context.update_checkpoint(event)

async def on_error(partition_context, error):
    """Called when an error occurs during receiving."""
    print(f"Error on partition {partition_context.partition_id}: {error}")

async def main():
    # Checkpoint store uses Azure Blob Storage
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        STORAGE_CONN_STR,
        CHECKPOINT_CONTAINER
    )

    processor = EventProcessorClient(
        fully_qualified_namespace="prod-events-ns.servicebus.windows.net",
        eventhub_name=EVENT_HUB_NAME,
        consumer_group=CONSUMER_GROUP,
        checkpoint_store=checkpoint_store,
        credential=...,  # use DefaultAzureCredential in production
        on_event=on_event,
        on_error=on_error
    )

    async with processor:
        await processor.start()
        await asyncio.sleep(3600)  # Run for 1 hour
        await processor.stop()

asyncio.run(main())

Checkpointing Strategy

Checkpointing saves the consumer's current position (offset) to Azure Blob Storage. Checkpointing too frequently adds storage write overhead. Checkpointing too infrequently risks reprocessing large numbers of events after a consumer restart.

Checkpointing StrategyDescriptionTrade-off
After every eventCheckpoint updated after each event is processedMinimal reprocessing risk; highest storage cost
After every N eventsCheckpoint updated after processing N eventsBalanced; N events may be reprocessed on restart
On a time scheduleCheckpoint updated every T secondsPredictable overhead; variable reprocessing amount
After a successful batchCheckpoint updated after completing a full processing batchGood for batch-oriented consumers

Reading from a Specific Offset or Timestamp

# Start from the beginning of the partition (replay all retained events)
starting_position = EventPosition.EARLIEST

# Start from the latest event (only new events)
starting_position = EventPosition.LATEST

# Start from a specific offset number
starting_position = EventPosition("150")

# Start from a specific timestamp
from datetime import datetime, timezone
starting_position = EventPosition(datetime(2024, 6, 15, 10, 0, 0, tzinfo=timezone.utc))

End-to-End Flow Summary

PRODUCER SIDE:
  1. IoT device or application creates EventData objects
  2. Adds events to an EventDataBatch (respects size limits)
  3. Sends batch to Event Hub using EventHubProducerClient
  4. Event Hub assigns offset and stores event in a partition

CONSUMER SIDE:
  1. EventProcessorClient starts on one or more host machines
  2. Client claims partitions from the checkpoint store
  3. For each event received, on_event callback fires
  4. Application processes the event (parse JSON, write to DB, trigger alert)
  5. update_checkpoint() saves the current offset to Blob Storage
  6. On restart, processor reads checkpoint and resumes from last saved offset

Summary

The EventHubProducerClient handles event publishing with batching support for maximum throughput. The EventProcessorClient is the production-grade consumer that balances partitions across multiple instances and manages checkpointing automatically. Combining both clients with Azure AD authentication and Blob Storage checkpointing creates a resilient, scalable event pipeline.

Leave a Comment