Microsoft Azure Event Hubs Checkpoint Store for Blob Storage (Async)
This asynchronous Python library provides a checkpoint store implementation for Azure Event Hubs, using Azure Blob Storage as the persistent store. It allows Event Hubs consumers to store checkpoints and partition ownership information, ensuring reliable event processing and fault tolerance. The library is part of the larger Azure SDK for Python and is currently at version 1.2.0, receiving updates as needed.
Warnings
- breaking The `BlobPartitionManager` class was renamed to `BlobCheckpointStore` in version 1.0.0b6. The constructor signature also changed, now taking storage container details directly instead of a `ContainerClient` instance.
- breaking As of version 1.2.0, support for Python 2.7, 3.6, and 3.7 has been dropped. The library now requires Python 3.8 or later.
- gotcha This is an asynchronous (aio) library. All operations and client instantiation require `async` and `await` keywords within an `asyncio` event loop.
- gotcha When using Azure Blob Storage for checkpointing, it's recommended to use a separate container for each consumer group, locate the storage account in the same region as the deployed application, and disable Hierarchical Namespace, Blob Soft Delete, and Versioning on the storage account for optimal performance and to prevent issues.
- gotcha If deploying on Azure Stack Hub or environments using older Azure Storage Service APIs, you may need to explicitly specify the `api_version` in the `BlobCheckpointStore` constructor (e.g., `api_version='2017-11-09'`) to avoid `HttpResponseError` due to incorrect header values.
- gotcha While connection strings can be used for authentication, for production applications, Azure recommends passwordless authentication using Azure Active Directory and the `azure-identity` library. Ensure the principal has 'Azure Event Hubs Data Owner' and 'Storage Blob Data Contributor' roles.
Install
-
pip install azure-eventhub-checkpointstoreblob-aio
Imports
- BlobCheckpointStore
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
Quickstart
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential
# Environment variables for Event Hubs
FULLY_QUALIFIED_NAMESPACE = os.environ.get('EVENT_HUB_FULLY_QUALIFIED_NAMESPACE', 'your_namespace.servicebus.windows.net')
EVENT_HUB_NAME = os.environ.get('EVENT_HUB_NAME', 'your_event_hub_name')
CONSUMER_GROUP = os.environ.get('EVENT_HUB_CONSUMER_GROUP', '$Default')
# Environment variables for Blob Storage
BLOB_STORAGE_ACCOUNT_URL = os.environ.get('AZURE_STORAGE_BLOB_URL', 'https://yourstorageaccount.blob.core.windows.net')
BLOB_CONTAINER_NAME = os.environ.get('BLOB_CONTAINER_NAME', 'your_container_name')
async def on_event(partition_context, event):
# Process the event here
if event.body_as_str():
print(f"Received event from partition ID: {partition_context.partition_id}, Body: {event.body_as_str()}")
# Update the checkpoint to mark the event as processed
await partition_context.update_checkpoint(event)
async def main():
credential = None
checkpoint_store = None
client = None
try:
# Authenticate using DefaultAzureCredential (Managed Identity, Environment variables, etc.)
credential = DefaultAzureCredential()
# Create an Azure Blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential
)
# Create a consumer client for the event hub.
client = EventHubConsumerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group=CONSUMER_GROUP,
checkpoint_store=checkpoint_store,
credential=credential,
)
async with client:
# Start receiving events
print(f"Listening for events in consumer group: {CONSUMER_GROUP} from Event Hub: {EVENT_HUB_NAME}")
await client.receive(on_event=on_event, starting_position="-1") # Start from beginning if no checkpoint
except Exception as e:
print(f"An error occurred: {e}")
finally:
if client:
await client.close()
if checkpoint_store:
await checkpoint_store.close()
if credential:
await credential.close()
if __name__ == "__main__":
asyncio.run(main())