Event Hub Sending and Receiving Events
Sending events to Azure Event Hub (producing) and receiving events from it (consuming) are the two primary operations in every Event Hub solution. The Azure SDK for .NET, Java, Python, JavaScript, and Go provides the libraries for both operations. This topic covers the producer pattern, consumer pattern, batching, Event Processor Host, and checkpointing with practical code examples.
SDK Overview
The Azure Event Hubs SDK package is Azure.Messaging.EventHubs for .NET and equivalents for other languages. The SDK has three primary classes:
| Class | Purpose |
|---|---|
| EventHubProducerClient | Sends (publishes) events to an Event Hub |
| EventHubConsumerClient | Reads events from a single partition (low-level; best for testing) |
| EventProcessorClient | Reads events from all partitions with automatic load balancing and checkpointing (production recommended) |
Sending Events – Producer Pattern
Step 1: Connection Setup
A producer connects to an Event Hub using either a connection string (from Shared Access Policy) or an Azure AD credential. Azure AD managed identity is recommended for production.
Step 2: Create EventData Objects
Each event is represented as an EventData object. The body is a byte array or a string that gets encoded. Optional properties can be added as key-value pairs.
Step 3: Send as a Batch (Recommended)
Batching multiple events into a single send call improves throughput and reduces network overhead. The EventDataBatch class automatically respects the maximum batch size limit.
Producer Code Example (Python)
import asyncio
import json
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
CONNECTION_STRING = "Endpoint=sb://prod-events-ns.servicebus.windows.net/;SharedAccessKeyName=producers-policy;SharedAccessKey=abc123=="
EVENT_HUB_NAME = "telemetry"
async def send_events():
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STRING,
eventhub_name=EVENT_HUB_NAME
)
async with producer:
# Create a batch
event_batch = await producer.create_batch()
# Add events to the batch
for i in range(10):
sensor_reading = {
"deviceId": f"sensor-{i:02d}",
"temperature": 72.4 + i,
"humidity": 55.2,
"timestamp": "2024-06-15T10:00:00Z"
}
# Add event to batch (automatically checks batch size limit)
event_batch.add(EventData(json.dumps(sensor_reading)))
# Send the entire batch in one call
await producer.send_batch(event_batch)
print(f"Sent batch of {len(event_batch)} events")
asyncio.run(send_events())
Sending with Partition Key
To ensure events for the same device always go to the same partition, set a partition key when creating the batch. Event Grid uses the key's hash to select the partition.
# All events from "sensor-05" always go to the same partition
batch_options = {"partition_key": "sensor-05"}
event_batch = await producer.create_batch(**batch_options)
event_batch.add(EventData(json.dumps(sensor_reading)))
await producer.send_batch(event_batch)
Producer Throughput Guidance
| Practice | Impact |
|---|---|
| Use batching (EventDataBatch) | Reduces network calls; increases throughput significantly |
| Use async/await pattern | Enables concurrent sends without blocking threads |
| Reuse ProducerClient instances | Creating a new client per event is expensive; create once and reuse |
| Handle ServerBusy (429) with retry | Implement exponential backoff on throttling errors |
| Use partition key for ordering needs | Guarantees order only within a partition for same-key events |
Receiving Events – Consumer Pattern
Option 1: EventHubConsumerClient (Testing and Simple Scenarios)
EventHubConsumerClient reads events from a specific partition. This is useful for testing and simple single-partition scenarios. It does not handle load balancing across multiple consumer instances and is not recommended for production at scale.
Consumer Code Example – Single Partition (Python)
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub import EventPosition
CONNECTION_STRING = "Endpoint=sb://prod-events-ns.servicebus.windows.net/;SharedAccessKeyName=consumers-policy;SharedAccessKey=xyz789=="
EVENT_HUB_NAME = "telemetry"
CONSUMER_GROUP = "$Default"
async def receive_events():
consumer = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STRING,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENT_HUB_NAME
)
async def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Offset: {event.offset}")
print(f"Body: {event.body_as_str()}")
# Update checkpoint after processing
await partition_context.update_checkpoint(event)
async with consumer:
# Receive from all partitions starting from latest events
await consumer.receive(
on_event=on_event,
starting_position=EventPosition.LATEST
)
asyncio.run(receive_events())
Option 2: EventProcessorClient (Production Recommended)
EventProcessorClient is the production-grade consumer. It distributes partitions automatically across multiple consumer instances, handles checkpointing to Azure Blob Storage, recovers from failures, and rebalances when instances are added or removed.
EventProcessorClient Architecture
Event Hub: "telemetry" (4 partitions)
Consumer Group: "analytics"
Blob Storage: "checkpoints-container"
Running consumer instances: 2
Instance A (host1):
Manages Partition 0 and Partition 1
Saves checkpoints for both to Blob Storage
Instance B (host2):
Manages Partition 2 and Partition 3
Saves checkpoints for both to Blob Storage
New Instance C (host3) joins:
EventProcessorClient rebalances automatically:
Instance A: Partition 0
Instance B: Partition 2
Instance C: Partition 1 and Partition 3 (or other distribution)
EventProcessorClient Code Example (Python)
import asyncio
from azure.eventhub.aio import EventProcessorClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
EVENT_HUB_CONN_STR = "Endpoint=sb://prod-events-ns..."
EVENT_HUB_NAME = "telemetry"
CONSUMER_GROUP = "analytics"
STORAGE_CONN_STR = "DefaultEndpointsProtocol=https;AccountName=mystorage..."
CHECKPOINT_CONTAINER = "event-hub-checkpoints"
async def on_event(partition_context, event):
"""Called for each received event."""
body = event.body_as_str()
print(f"Partition {partition_context.partition_id} | Offset {event.offset}")
print(f"Event body: {body}")
# Save checkpoint after successful processing
await partition_context.update_checkpoint(event)
async def on_error(partition_context, error):
"""Called when an error occurs during receiving."""
print(f"Error on partition {partition_context.partition_id}: {error}")
async def main():
# Checkpoint store uses Azure Blob Storage
checkpoint_store = BlobCheckpointStore.from_connection_string(
STORAGE_CONN_STR,
CHECKPOINT_CONTAINER
)
processor = EventProcessorClient(
fully_qualified_namespace="prod-events-ns.servicebus.windows.net",
eventhub_name=EVENT_HUB_NAME,
consumer_group=CONSUMER_GROUP,
checkpoint_store=checkpoint_store,
credential=..., # use DefaultAzureCredential in production
on_event=on_event,
on_error=on_error
)
async with processor:
await processor.start()
await asyncio.sleep(3600) # Run for 1 hour
await processor.stop()
asyncio.run(main())
Checkpointing Strategy
Checkpointing saves the consumer's current position (offset) to Azure Blob Storage. Checkpointing too frequently adds storage write overhead. Checkpointing too infrequently risks reprocessing large numbers of events after a consumer restart.
| Checkpointing Strategy | Description | Trade-off |
|---|---|---|
| After every event | Checkpoint updated after each event is processed | Minimal reprocessing risk; highest storage cost |
| After every N events | Checkpoint updated after processing N events | Balanced; N events may be reprocessed on restart |
| On a time schedule | Checkpoint updated every T seconds | Predictable overhead; variable reprocessing amount |
| After a successful batch | Checkpoint updated after completing a full processing batch | Good for batch-oriented consumers |
Reading from a Specific Offset or Timestamp
# Start from the beginning of the partition (replay all retained events)
starting_position = EventPosition.EARLIEST
# Start from the latest event (only new events)
starting_position = EventPosition.LATEST
# Start from a specific offset number
starting_position = EventPosition("150")
# Start from a specific timestamp
from datetime import datetime, timezone
starting_position = EventPosition(datetime(2024, 6, 15, 10, 0, 0, tzinfo=timezone.utc))
End-to-End Flow Summary
PRODUCER SIDE: 1. IoT device or application creates EventData objects 2. Adds events to an EventDataBatch (respects size limits) 3. Sends batch to Event Hub using EventHubProducerClient 4. Event Hub assigns offset and stores event in a partition CONSUMER SIDE: 1. EventProcessorClient starts on one or more host machines 2. Client claims partitions from the checkpoint store 3. For each event received, on_event callback fires 4. Application processes the event (parse JSON, write to DB, trigger alert) 5. update_checkpoint() saves the current offset to Blob Storage 6. On restart, processor reads checkpoint and resumes from last saved offset
Summary
The EventHubProducerClient handles event publishing with batching support for maximum throughput. The EventProcessorClient is the production-grade consumer that balances partitions across multiple instances and manages checkpointing automatically. Combining both clients with Azure AD authentication and Blob Storage checkpointing creates a resilient, scalable event pipeline.
