Microsoft Azure Event Hubs Checkpoint Store for Blob Storage (Async)

1.2.0 · active · verified Mon Apr 13

This asynchronous Python library provides a checkpoint store implementation for Azure Event Hubs, using Azure Blob Storage as the persistent store. It allows Event Hubs consumers to store checkpoints and partition ownership information, ensuring reliable event processing and fault tolerance. The library is part of the larger Azure SDK for Python and is currently at version 1.2.0, receiving updates as needed.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up an `EventHubConsumerClient` to receive events from Azure Event Hubs, using `BlobCheckpointStore` for managing checkpoints and partition ownership. It uses `DefaultAzureCredential` from `azure-identity` for passwordless authentication, which is recommended for production scenarios. Ensure the necessary environment variables for your Event Hubs namespace, Event Hub name, Blob Storage account URL, and container name are set, and the consuming identity has appropriate 'Azure Event Hubs Data Owner' and 'Storage Blob Data Contributor' roles.

import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential

# Environment variables for Event Hubs
FULLY_QUALIFIED_NAMESPACE = os.environ.get('EVENT_HUB_FULLY_QUALIFIED_NAMESPACE', 'your_namespace.servicebus.windows.net')
EVENT_HUB_NAME = os.environ.get('EVENT_HUB_NAME', 'your_event_hub_name')
CONSUMER_GROUP = os.environ.get('EVENT_HUB_CONSUMER_GROUP', '$Default')

# Environment variables for Blob Storage
BLOB_STORAGE_ACCOUNT_URL = os.environ.get('AZURE_STORAGE_BLOB_URL', 'https://yourstorageaccount.blob.core.windows.net')
BLOB_CONTAINER_NAME = os.environ.get('BLOB_CONTAINER_NAME', 'your_container_name')

async def on_event(partition_context, event):
    # Process the event here
    if event.body_as_str():
        print(f"Received event from partition ID: {partition_context.partition_id}, Body: {event.body_as_str()}")
    # Update the checkpoint to mark the event as processed
    await partition_context.update_checkpoint(event)

async def main():
    credential = None
    checkpoint_store = None
    client = None
    try:
        # Authenticate using DefaultAzureCredential (Managed Identity, Environment variables, etc.)
        credential = DefaultAzureCredential()

        # Create an Azure Blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential
        )

        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group=CONSUMER_GROUP,
            checkpoint_store=checkpoint_store,
            credential=credential,
        )

        async with client:
            # Start receiving events
            print(f"Listening for events in consumer group: {CONSUMER_GROUP} from Event Hub: {EVENT_HUB_NAME}")
            await client.receive(on_event=on_event, starting_position="-1") # Start from beginning if no checkpoint

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if client:
            await client.close()
        if checkpoint_store:
            await checkpoint_store.close()
        if credential:
            await credential.close()

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →