Service Bus with Python SDK

The azure-servicebus Python package is the official SDK for Azure Service Bus. It supports both synchronous and asynchronous (asyncio) programming models and covers sending, receiving, sessions, dead-letter queues, scheduled messages, and Managed Identity authentication. This topic provides complete, ready-to-run examples for every major scenario.

Installation

pip install azure-servicebus
pip install azure-identity    # For Managed Identity / DefaultAzureCredential

SDK Client Types

azure.servicebus
  |-- ServiceBusClient
  |       |-- get_queue_sender("queue")          --> ServiceBusSender
  |       |-- get_queue_receiver("queue")        --> ServiceBusReceiver
  |       |-- get_topic_sender("topic")          --> ServiceBusSender
  |       |-- get_subscription_receiver(t, sub)  --> ServiceBusReceiver
  |
  |-- ServiceBusMessage       (outgoing message)
  |-- ServiceBusReceivedMessage  (incoming message — read-only)

azure.servicebus.aio
  |-- ServiceBusClient        (async version of everything above)

1. Client Authentication

Connection String (Development)

from azure.servicebus import ServiceBusClient

connection_str = (
    "Endpoint=sb://myshopns.servicebus.windows.net/;"
    "SharedAccessKeyName=RootManageSharedAccessKey;"
    "SharedAccessKey=<key>"
)

with ServiceBusClient.from_connection_string(connection_str) as client:
    print("Connected.")

Managed Identity (Production — No Secrets)

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

fq_namespace = "myshopns.servicebus.windows.net"
credential   = DefaultAzureCredential()

with ServiceBusClient(fq_namespace, credential) as client:
    print("Connected using Managed Identity.")

2. Sending Messages

Send a Single Message to a Queue

import json
from azure.servicebus import ServiceBusClient, ServiceBusMessage

connection_str = "Endpoint=sb://myshopns.servicebus.windows.net/;..."
queue_name     = "orders"

order = {"orderId": 101, "product": "Laptop", "amount": 1200.00, "region": "India"}

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_sender(queue_name) as sender:

        msg = ServiceBusMessage(
            body           = json.dumps(order),
            message_id     = "order-101",
            subject        = "NewOrder",
            content_type   = "application/json",
            time_to_live   = timedelta(hours=24)
        )
        msg.application_properties = {
            "Region":   "India",
            "Priority": "High",
            "Amount":   1200.00
        }

        sender.send_messages(msg)
        print("Message sent.")

Send to a Topic

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_topic_sender("order-events") as sender:
        msg = ServiceBusMessage(
            body       = json.dumps(order),
            message_id = "event-order-101",
            subject    = "OrderPlaced"
        )
        msg.application_properties = {"Region": "India"}
        sender.send_messages(msg)
        print("Event published to topic.")

Send a Batch of Messages

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_sender(queue_name) as sender:

        batch = sender.create_message_batch()

        for i in range(1, 51):
            msg = ServiceBusMessage(
                body       = json.dumps({"orderId": i}),
                message_id = f"order-{i}"
            )
            try:
                batch.add_message(msg)
            except ValueError:
                # Batch full — send and start new batch
                sender.send_messages(batch)
                print(f"Batch sent.")
                batch = sender.create_message_batch()
                batch.add_message(msg)

        sender.send_messages(batch)
        print("All batches sent.")

3. Receiving Messages — Pull Mode

Receive and Complete a Single Message

from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_receiver(
        queue_name,
        receive_mode = ServiceBusReceiveMode.PEEK_LOCK,
        max_wait_time = 10
    ) as receiver:

        for msg in receiver:
            try:
                body = str(msg)
                print(f"MessageId    : {msg.message_id}")
                print(f"Body         : {body}")
                print(f"DeliveryCount: {msg.delivery_count}")

                # business logic here

                receiver.complete_message(msg)
                print("Completed.")
            except Exception as ex:
                print(f"Error: {ex}. Abandoning.")
                receiver.abandon_message(msg)
            break  # process one message then exit loop

Receive Multiple Messages

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
        messages = receiver.receive_messages(max_message_count=20, max_wait_time=5)

        for msg in messages:
            try:
                print(f"Processing: {str(msg)}")
                receiver.complete_message(msg)
            except Exception as ex:
                print(f"Error: {ex}. Abandoning.")
                receiver.abandon_message(msg)

Receive from a Topic Subscription

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_subscription_receiver(
        topic_name        = "order-events",
        subscription_name = "inventory-sub",
        max_wait_time     = 10
    ) as receiver:
        for msg in receiver:
            print(f"Event received: {str(msg)}")
            receiver.complete_message(msg)
            break

4. Dead-Letter Queue Access

from azure.servicebus import ServiceBusSubQueue

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_receiver(
        queue_name,
        sub_queue     = ServiceBusSubQueue.DEAD_LETTER,
        max_wait_time = 5
    ) as dlq_receiver:

        for msg in dlq_receiver:
            print(f"DLQ Body   : {str(msg)}")
            print(f"DLQ Reason : {msg.dead_letter_reason}")
            print(f"DLQ Desc   : {msg.dead_letter_error_description}")
            print(f"Deliveries : {msg.delivery_count}")
            dlq_receiver.complete_message(msg)
            break

5. Message Sessions

Send Session Messages

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_sender("order-session-queue") as sender:
        for step in ["Created", "PaymentConfirmed", "Packed", "Shipped"]:
            msg = ServiceBusMessage(
                body       = f"order-101: {step}",
                message_id = f"order-101-{step}",
                session_id = "order-101"
            )
            sender.send_messages(msg)
            print(f"Sent: {step}")

Receive Session Messages

with ServiceBusClient.from_connection_string(connection_str) as client:

    # Accept a specific session
    with client.accept_session(
        queue_name    = "order-session-queue",
        session_id    = "order-101",
        max_wait_time = 5
    ) as session_receiver:

        print(f"Accepted session: {session_receiver.session_id}")

        for msg in session_receiver:
            print(f"Step: {str(msg)}")
            session_receiver.complete_message(msg)

6. Scheduled Messages

from datetime import datetime, timezone, timedelta

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_sender(queue_name) as sender:

        msg = ServiceBusMessage(
            body       = json.dumps({"orderId": 999, "type": "scheduled"}),
            message_id = "sched-order-999"
        )

        delivery_time = datetime.now(timezone.utc) + timedelta(hours=2)
        seq_no = sender.schedule_messages(msg, delivery_time)
        print(f"Scheduled at {delivery_time}. SequenceNumber: {seq_no}")

        # Cancel if needed
        sender.cancel_scheduled_messages(seq_no)
        print("Scheduled message cancelled.")

7. Deferral

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
        for msg in receiver:
            if "Step3" in str(msg) and not step1_done:
                receiver.defer_message(msg)
                deferred_seq_no = msg.sequence_number
                print(f"Deferred. SeqNo: {deferred_seq_no}")
                break
            else:
                receiver.complete_message(msg)

    # Retrieve deferred later
    with client.get_queue_receiver(queue_name) as receiver:
        deferred_msgs = receiver.receive_deferred_messages([deferred_seq_no])
        for msg in deferred_msgs:
            print(f"Processing deferred: {str(msg)}")
            receiver.complete_message(msg)

8. Async (asyncio) Client

import asyncio
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage

async def send_async():
    async with ServiceBusClient.from_connection_string(connection_str) as client:
        async with client.get_queue_sender("orders") as sender:
            msg = ServiceBusMessage(
                body       = json.dumps({"orderId": 200}),
                message_id = "async-order-200"
            )
            await sender.send_messages(msg)
            print("Async message sent.")

async def receive_async():
    async with ServiceBusClient.from_connection_string(connection_str) as client:
        async with client.get_queue_receiver("orders", max_wait_time=10) as receiver:
            async for msg in receiver:
                print(f"Async received: {str(msg)}")
                await receiver.complete_message(msg)
                break

asyncio.run(send_async())
asyncio.run(receive_async())

9. Manual Dead-Lettering

with ServiceBusClient.from_connection_string(connection_str) as client:
    with client.get_queue_receiver(queue_name, max_wait_time=5) as receiver:
        for msg in receiver:
            body = str(msg)
            if "orderId" not in body:
                receiver.dead_letter_message(
                    msg,
                    reason             = "SchemaValidationFailed",
                    error_description  = "Message body missing required field: orderId"
                )
                print("Invalid message dead-lettered.")
            else:
                print(f"Processing: {body}")
                receiver.complete_message(msg)
            break

10. Error Handling

from azure.servicebus.exceptions import (
    ServiceBusError,
    MessageLockLostError,
    SessionLockLostError,
    ServiceBusConnectionError
)

try:
    with ServiceBusClient.from_connection_string(connection_str) as client:
        with client.get_queue_sender(queue_name) as sender:
            sender.send_messages(ServiceBusMessage("Test"))

except MessageLockLostError:
    print("Lock expired during processing. Increase lock duration.")

except ServiceBusConnectionError:
    print("Connection failed. Check network and connection string.")

except ServiceBusError as ex:
    print(f"Service Bus error: {ex}")

Common Properties on Received Messages

PropertyDescription
msg.message_idUnique ID set by sender
msg.subjectShort label / subject
msg.content_typeMIME type of body (e.g., application/json)
msg.delivery_countNumber of delivery attempts
msg.sequence_numberUnique sequential number from Service Bus
msg.enqueued_time_utcTime message entered the queue
msg.locked_until_utcLock expiry timestamp
msg.session_idSession group identifier
msg.application_propertiesCustom key-value metadata from sender
msg.dead_letter_reasonWhy the message was dead-lettered (DLQ only)

Best Practices

PracticeReason
Use context managers (with)Auto-closes connections and releases AMQP links
Use PEEK_LOCK modePrevents message loss if processing fails
Set meaningful message_idEnables duplicate detection
Use async client for FastAPI / async appsNon-blocking I/O avoids event loop stalls
Use DefaultAzureCredential in productionManaged Identity — no secrets in code
Use batch sending for high volumeReduces network round trips

Summary

The azure-servicebus Python SDK provides both synchronous and async interfaces for all Service Bus operations. Context managers (with blocks) ensure clean connection handling. Sending supports single messages, batches, topic publishing, and scheduled delivery. Receiving supports pull-based iteration, dead-letter access, session handling, and message deferral. The async client integrates naturally with FastAPI, aiohttp, and any asyncio-based Python application. Using DefaultAzureCredential with Managed Identity removes secrets entirely from Python workloads running in Azure.

Leave a Comment