Azure Event Hubs Checkpoint Store Blob
The `azure-eventhub-checkpointstoreblob` library provides a checkpointer implementation for Azure Event Hubs, using Azure Blob Storage as the persistent store. It integrates as a plug-in package with `EventHubConsumerClient` from `azure-eventhub` to manage checkpoints and partition ownership information. This is the synchronous version of the library; an asynchronous counterpart `azure-eventhub-checkpointstoreblob-aio` is also available. As part of the Azure SDK for Python, it follows a regular release cadence.
Warnings
- breaking Version 1.0.0b6 introduced breaking changes: `BlobPartitionManager` was renamed to `BlobCheckpointStore`, and its constructor was updated to accept storage container details directly instead of a `ContainerClient` instance. The `blobstoragepm` module became internal.
- gotcha For optimal performance and reliability, it is strongly recommended to use a dedicated Blob Storage container for each consumer group. The storage account should be in the same region as the Event Hub consumer application, and it should not be used for other workloads. Additionally, disable 'Hierarchical namespace', 'Blob soft delete', and 'Versioning' for the container.
- gotcha In environments like Azure Stack Hub, which may support older Azure Storage Service API versions, you might need to explicitly specify `api_version` (e.g., `api_version='2017-11-09'`) when creating `BlobCheckpointStore` to avoid `HttpResponseError` exceptions. The default API version is '2019-07-07'.
- deprecated Support for Python 2.7 has officially ended on January 1, 2022. All versions of this library from 1.2.0 onwards require Python 3.8 or later. Earlier versions had varying Python 3.x support.
- gotcha Azure Event Hubs client libraries are transitioning towards exclusively using sequence number-based checkpoints. Relying on offsets for external systems like KEDA scalers can be problematic because offsets do not change predictably. While the library contract might remain stable, the underlying behavior for external monitoring or scaling based on checkpoint data could change.
Install
-
pip install azure-eventhub-checkpointstoreblob
Imports
- BlobCheckpointStore
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
Quickstart
import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
# Environment variables for Event Hubs
EVENTHUB_CONNECTION_STR = os.environ.get('EVENTHUB_CONNECTION_STR', '')
EVENTHUB_NAME = os.environ.get('EVENTHUB_NAME', '')
CONSUMER_GROUP = os.environ.get('CONSUMER_GROUP', '$Default')
# Environment variables for Azure Blob Storage Checkpoint Store
BLOB_STORAGE_ACCOUNT_URL = os.environ.get('BLOB_STORAGE_ACCOUNT_URL', '') # e.g., 'https://<your_storage_account_name>.blob.core.windows.net/'
BLOB_CONTAINER_NAME = os.environ.get('BLOB_CONTAINER_NAME', '')
# DefaultAzureCredential will attempt to authenticate using environment variables,
# managed identity, Azure CLI, etc.
credential = DefaultAzureCredential()
def on_event(partition_context, event):
print(f"Received event from partition {partition_context.partition_id}, sequence_number: {event.sequence_number}")
# Your event processing logic here
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
partition_context.update_checkpoint(event)
def main():
if not all([EVENTHUB_CONNECTION_STR, EVENTHUB_NAME, BLOB_STORAGE_ACCOUNT_URL, BLOB_CONTAINER_NAME]):
print("Please set EVENTHUB_CONNECTION_STR, EVENTHUB_NAME, BLOB_STORAGE_ACCOUNT_URL, and BLOB_CONTAINER_NAME environment variables.")
return
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store,
)
with client:
print(f"Listening for events in consumer group '{CONSUMER_GROUP}' on Event Hub '{EVENTHUB_NAME}'...")
# Call the receive method. Read from the beginning of the partition
client.receive(on_event=on_event, starting_position="-1")
credential.close()
if __name__ == "__main__":
main()