{"id":5124,"library":"azure-eventhub-checkpointstoreblob-aio","title":"Microsoft Azure Event Hubs Checkpoint Store for Blob Storage (Async)","description":"This asynchronous Python library provides a checkpoint store implementation for Azure Event Hubs, using Azure Blob Storage as the persistent store. It allows Event Hubs consumers to store checkpoints and partition ownership information, ensuring reliable event processing and fault tolerance. The library is part of the larger Azure SDK for Python and is currently at version 1.2.0, receiving updates as needed.","status":"active","version":"1.2.0","language":"en","source_language":"en","source_url":"https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio","tags":["azure","eventhubs","blob storage","checkpointing","async","cloud"],"install":[{"cmd":"pip install azure-eventhub-checkpointstoreblob-aio","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"Required for core Event Hubs consumer functionality, as this library acts as a plug-in for EventHubConsumerClient.","package":"azure-eventhub","optional":false},{"reason":"Underlying client library used for interacting with Azure Blob Storage.","package":"azure-storage-blob","optional":false},{"reason":"Recommended for passwordless authentication using Azure Active Directory credentials.","package":"azure-identity","optional":true}],"imports":[{"note":"The class was renamed from `BlobPartitionManager` to `BlobCheckpointStore` and the internal module `blobstoragepmaio` should not be imported directly since version 1.0.0b6.","wrong":"from azure.eventhub.extensions.checkpointstoreblobaio.blobstoragepmaio import BlobPartitionManager","symbol":"BlobCheckpointStore","correct":"from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore"}],"quickstart":{"code":"import asyncio\nimport os\nfrom azure.eventhub.aio import EventHubConsumerClient\nfrom azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore\nfrom azure.identity.aio import DefaultAzureCredential\n\n# Environment variables for Event Hubs\nFULLY_QUALIFIED_NAMESPACE = os.environ.get('EVENT_HUB_FULLY_QUALIFIED_NAMESPACE', 'your_namespace.servicebus.windows.net')\nEVENT_HUB_NAME = os.environ.get('EVENT_HUB_NAME', 'your_event_hub_name')\nCONSUMER_GROUP = os.environ.get('EVENT_HUB_CONSUMER_GROUP', '$Default')\n\n# Environment variables for Blob Storage\nBLOB_STORAGE_ACCOUNT_URL = os.environ.get('AZURE_STORAGE_BLOB_URL', 'https://yourstorageaccount.blob.core.windows.net')\nBLOB_CONTAINER_NAME = os.environ.get('BLOB_CONTAINER_NAME', 'your_container_name')\n\nasync def on_event(partition_context, event):\n    # Process the event here\n    if event.body_as_str():\n        print(f\"Received event from partition ID: {partition_context.partition_id}, Body: {event.body_as_str()}\")\n    # Update the checkpoint to mark the event as processed\n    await partition_context.update_checkpoint(event)\n\nasync def main():\n    credential = None\n    checkpoint_store = None\n    client = None\n    try:\n        # Authenticate using DefaultAzureCredential (Managed Identity, Environment variables, etc.)\n        credential = DefaultAzureCredential()\n\n        # Create an Azure Blob checkpoint store to store the checkpoints.\n        checkpoint_store = BlobCheckpointStore(\n            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,\n            container_name=BLOB_CONTAINER_NAME,\n            credential=credential\n        )\n\n        # Create a consumer client for the event hub.\n        client = EventHubConsumerClient(\n            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,\n            eventhub_name=EVENT_HUB_NAME,\n            consumer_group=CONSUMER_GROUP,\n            checkpoint_store=checkpoint_store,\n            credential=credential,\n        )\n\n        async with client:\n            # Start receiving events\n            print(f\"Listening for events in consumer group: {CONSUMER_GROUP} from Event Hub: {EVENT_HUB_NAME}\")\n            await client.receive(on_event=on_event, starting_position=\"-1\") # Start from beginning if no checkpoint\n\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n    finally:\n        if client:\n            await client.close()\n        if checkpoint_store:\n            await checkpoint_store.close()\n        if credential:\n            await credential.close()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates how to set up an `EventHubConsumerClient` to receive events from Azure Event Hubs, using `BlobCheckpointStore` for managing checkpoints and partition ownership. It uses `DefaultAzureCredential` from `azure-identity` for passwordless authentication, which is recommended for production scenarios. Ensure the necessary environment variables for your Event Hubs namespace, Event Hub name, Blob Storage account URL, and container name are set, and the consuming identity has appropriate 'Azure Event Hubs Data Owner' and 'Storage Blob Data Contributor' roles."},"warnings":[{"fix":"Update class name to `BlobCheckpointStore` and adjust constructor parameters. Use `BlobCheckpointStore.from_connection_string` for connection string-based initialization or pass `blob_account_url`, `container_name`, and `credential`.","message":"The `BlobPartitionManager` class was renamed to `BlobCheckpointStore` in version 1.0.0b6. The constructor signature also changed, now taking storage container details directly instead of a `ContainerClient` instance.","severity":"breaking","affected_versions":"<1.0.0b6"},{"fix":"Upgrade your Python environment to version 3.8 or newer.","message":"As of version 1.2.0, support for Python 2.7, 3.6, and 3.7 has been dropped. The library now requires Python 3.8 or later.","severity":"breaking","affected_versions":"<1.2.0"},{"fix":"Ensure your application structure is asynchronous, utilizing `async def` functions and `await` calls for library methods.","message":"This is an asynchronous (aio) library. All operations and client instantiation require `async` and `await` keywords within an `asyncio` event loop.","severity":"gotcha","affected_versions":"All"},{"fix":"Follow Azure's best practices for Blob Storage configuration when used as a checkpoint store, as detailed in the documentation.","message":"When using Azure Blob Storage for checkpointing, it's recommended to use a separate container for each consumer group, locate the storage account in the same region as the deployed application, and disable Hierarchical Namespace, Blob Soft Delete, and Versioning on the storage account for optimal performance and to prevent issues.","severity":"gotcha","affected_versions":"All"},{"fix":"Determine the supported Storage Service API version for your environment and pass it as the `api_version` keyword argument during `BlobCheckpointStore` instantiation.","message":"If deploying on Azure Stack Hub or environments using older Azure Storage Service APIs, you may need to explicitly specify the `api_version` in the `BlobCheckpointStore` constructor (e.g., `api_version='2017-11-09'`) to avoid `HttpResponseError` due to incorrect header values.","severity":"gotcha","affected_versions":"All (specific deployment environments)"},{"fix":"Migrate to passwordless authentication using `DefaultAzureCredential` or other `azure-identity` credentials. Assign the necessary Azure RBAC roles to your application's identity.","message":"While connection strings can be used for authentication, for production applications, Azure recommends passwordless authentication using Azure Active Directory and the `azure-identity` library. Ensure the principal has 'Azure Event Hubs Data Owner' and 'Storage Blob Data Contributor' roles.","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}