Service Bus with Python SDK
The azure-servicebus Python package is the official SDK for Azure Service Bus. It supports both synchronous and asynchronous (asyncio) programming models and covers sending, receiving, sessions, dead-letter queues, scheduled messages, and Managed Identity authentication. This topic provides complete, ready-to-run examples for every major scenario.
Installation
pip install azure-servicebus pip install azure-identity # For Managed Identity / DefaultAzureCredential
SDK Client Types
azure.servicebus
|-- ServiceBusClient
| |-- get_queue_sender("queue") --> ServiceBusSender
| |-- get_queue_receiver("queue") --> ServiceBusReceiver
| |-- get_topic_sender("topic") --> ServiceBusSender
| |-- get_subscription_receiver(t, sub) --> ServiceBusReceiver
|
|-- ServiceBusMessage (outgoing message)
|-- ServiceBusReceivedMessage (incoming message — read-only)
azure.servicebus.aio
|-- ServiceBusClient (async version of everything above)
1. Client Authentication
Connection String (Development)
from azure.servicebus import ServiceBusClient
connection_str = (
"Endpoint=sb://myshopns.servicebus.windows.net/;"
"SharedAccessKeyName=RootManageSharedAccessKey;"
"SharedAccessKey=<key>"
)
with ServiceBusClient.from_connection_string(connection_str) as client:
print("Connected.")
Managed Identity (Production — No Secrets)
from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential
fq_namespace = "myshopns.servicebus.windows.net"
credential = DefaultAzureCredential()
with ServiceBusClient(fq_namespace, credential) as client:
print("Connected using Managed Identity.")
2. Sending Messages
Send a Single Message to a Queue
import json
from azure.servicebus import ServiceBusClient, ServiceBusMessage
connection_str = "Endpoint=sb://myshopns.servicebus.windows.net/;..."
queue_name = "orders"
order = {"orderId": 101, "product": "Laptop", "amount": 1200.00, "region": "India"}
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_sender(queue_name) as sender:
msg = ServiceBusMessage(
body = json.dumps(order),
message_id = "order-101",
subject = "NewOrder",
content_type = "application/json",
time_to_live = timedelta(hours=24)
)
msg.application_properties = {
"Region": "India",
"Priority": "High",
"Amount": 1200.00
}
sender.send_messages(msg)
print("Message sent.")
Send to a Topic
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_topic_sender("order-events") as sender:
msg = ServiceBusMessage(
body = json.dumps(order),
message_id = "event-order-101",
subject = "OrderPlaced"
)
msg.application_properties = {"Region": "India"}
sender.send_messages(msg)
print("Event published to topic.")
Send a Batch of Messages
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_sender(queue_name) as sender:
batch = sender.create_message_batch()
for i in range(1, 51):
msg = ServiceBusMessage(
body = json.dumps({"orderId": i}),
message_id = f"order-{i}"
)
try:
batch.add_message(msg)
except ValueError:
# Batch full — send and start new batch
sender.send_messages(batch)
print(f"Batch sent.")
batch = sender.create_message_batch()
batch.add_message(msg)
sender.send_messages(batch)
print("All batches sent.")
3. Receiving Messages — Pull Mode
Receive and Complete a Single Message
from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_receiver(
queue_name,
receive_mode = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time = 10
) as receiver:
for msg in receiver:
try:
body = str(msg)
print(f"MessageId : {msg.message_id}")
print(f"Body : {body}")
print(f"DeliveryCount: {msg.delivery_count}")
# business logic here
receiver.complete_message(msg)
print("Completed.")
except Exception as ex:
print(f"Error: {ex}. Abandoning.")
receiver.abandon_message(msg)
break # process one message then exit loop
Receive Multiple Messages
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
messages = receiver.receive_messages(max_message_count=20, max_wait_time=5)
for msg in messages:
try:
print(f"Processing: {str(msg)}")
receiver.complete_message(msg)
except Exception as ex:
print(f"Error: {ex}. Abandoning.")
receiver.abandon_message(msg)
Receive from a Topic Subscription
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_subscription_receiver(
topic_name = "order-events",
subscription_name = "inventory-sub",
max_wait_time = 10
) as receiver:
for msg in receiver:
print(f"Event received: {str(msg)}")
receiver.complete_message(msg)
break
4. Dead-Letter Queue Access
from azure.servicebus import ServiceBusSubQueue
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_receiver(
queue_name,
sub_queue = ServiceBusSubQueue.DEAD_LETTER,
max_wait_time = 5
) as dlq_receiver:
for msg in dlq_receiver:
print(f"DLQ Body : {str(msg)}")
print(f"DLQ Reason : {msg.dead_letter_reason}")
print(f"DLQ Desc : {msg.dead_letter_error_description}")
print(f"Deliveries : {msg.delivery_count}")
dlq_receiver.complete_message(msg)
break
5. Message Sessions
Send Session Messages
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_sender("order-session-queue") as sender:
for step in ["Created", "PaymentConfirmed", "Packed", "Shipped"]:
msg = ServiceBusMessage(
body = f"order-101: {step}",
message_id = f"order-101-{step}",
session_id = "order-101"
)
sender.send_messages(msg)
print(f"Sent: {step}")
Receive Session Messages
with ServiceBusClient.from_connection_string(connection_str) as client:
# Accept a specific session
with client.accept_session(
queue_name = "order-session-queue",
session_id = "order-101",
max_wait_time = 5
) as session_receiver:
print(f"Accepted session: {session_receiver.session_id}")
for msg in session_receiver:
print(f"Step: {str(msg)}")
session_receiver.complete_message(msg)
6. Scheduled Messages
from datetime import datetime, timezone, timedelta
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_sender(queue_name) as sender:
msg = ServiceBusMessage(
body = json.dumps({"orderId": 999, "type": "scheduled"}),
message_id = "sched-order-999"
)
delivery_time = datetime.now(timezone.utc) + timedelta(hours=2)
seq_no = sender.schedule_messages(msg, delivery_time)
print(f"Scheduled at {delivery_time}. SequenceNumber: {seq_no}")
# Cancel if needed
sender.cancel_scheduled_messages(seq_no)
print("Scheduled message cancelled.")
7. Deferral
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
for msg in receiver:
if "Step3" in str(msg) and not step1_done:
receiver.defer_message(msg)
deferred_seq_no = msg.sequence_number
print(f"Deferred. SeqNo: {deferred_seq_no}")
break
else:
receiver.complete_message(msg)
# Retrieve deferred later
with client.get_queue_receiver(queue_name) as receiver:
deferred_msgs = receiver.receive_deferred_messages([deferred_seq_no])
for msg in deferred_msgs:
print(f"Processing deferred: {str(msg)}")
receiver.complete_message(msg)
8. Async (asyncio) Client
import asyncio
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
async def send_async():
async with ServiceBusClient.from_connection_string(connection_str) as client:
async with client.get_queue_sender("orders") as sender:
msg = ServiceBusMessage(
body = json.dumps({"orderId": 200}),
message_id = "async-order-200"
)
await sender.send_messages(msg)
print("Async message sent.")
async def receive_async():
async with ServiceBusClient.from_connection_string(connection_str) as client:
async with client.get_queue_receiver("orders", max_wait_time=10) as receiver:
async for msg in receiver:
print(f"Async received: {str(msg)}")
await receiver.complete_message(msg)
break
asyncio.run(send_async())
asyncio.run(receive_async())
9. Manual Dead-Lettering
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
for msg in receiver:
body = str(msg)
if "orderId" not in body:
receiver.dead_letter_message(
msg,
reason = "SchemaValidationFailed",
error_description = "Message body missing required field: orderId"
)
print("Invalid message dead-lettered.")
else:
print(f"Processing: {body}")
receiver.complete_message(msg)
break
10. Error Handling
from azure.servicebus.exceptions import (
ServiceBusError,
MessageLockLostError,
SessionLockLostError,
ServiceBusConnectionError
)
try:
with ServiceBusClient.from_connection_string(connection_str) as client:
with client.get_queue_sender(queue_name) as sender:
sender.send_messages(ServiceBusMessage("Test"))
except MessageLockLostError:
print("Lock expired during processing. Increase lock duration.")
except ServiceBusConnectionError:
print("Connection failed. Check network and connection string.")
except ServiceBusError as ex:
print(f"Service Bus error: {ex}")
Common Properties on Received Messages
| Property | Description |
|---|---|
| msg.message_id | Unique ID set by sender |
| msg.subject | Short label / subject |
| msg.content_type | MIME type of body (e.g., application/json) |
| msg.delivery_count | Number of delivery attempts |
| msg.sequence_number | Unique sequential number from Service Bus |
| msg.enqueued_time_utc | Time message entered the queue |
| msg.locked_until_utc | Lock expiry timestamp |
| msg.session_id | Session group identifier |
| msg.application_properties | Custom key-value metadata from sender |
| msg.dead_letter_reason | Why the message was dead-lettered (DLQ only) |
Best Practices
| Practice | Reason |
|---|---|
Use context managers (with) | Auto-closes connections and releases AMQP links |
| Use PEEK_LOCK mode | Prevents message loss if processing fails |
Set meaningful message_id | Enables duplicate detection |
| Use async client for FastAPI / async apps | Non-blocking I/O avoids event loop stalls |
| Use DefaultAzureCredential in production | Managed Identity — no secrets in code |
| Use batch sending for high volume | Reduces network round trips |
Summary
The azure-servicebus Python SDK provides both synchronous and async interfaces for all Service Bus operations. Context managers (with blocks) ensure clean connection handling. Sending supports single messages, batches, topic publishing, and scheduled delivery. Receiving supports pull-based iteration, dead-letter access, session handling, and message deferral. The async client integrates naturally with FastAPI, aiohttp, and any asyncio-based Python application. Using DefaultAzureCredential with Managed Identity removes secrets entirely from Python workloads running in Azure.
