Azure Event Hubs Checkpoint Store Blob

1.2.0 · active · verified Sun Apr 12

The `azure-eventhub-checkpointstoreblob` library provides a checkpointer implementation for Azure Event Hubs, using Azure Blob Storage as the persistent store. It integrates as a plug-in package with `EventHubConsumerClient` from `azure-eventhub` to manage checkpoints and partition ownership information. This is the synchronous version of the library; an asynchronous counterpart `azure-eventhub-checkpointstoreblob-aio` is also available. As part of the Azure SDK for Python, it follows a regular release cadence.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to consume events from an Azure Event Hub using `EventHubConsumerClient` and persist checkpoints in Azure Blob Storage using `BlobCheckpointStore`. It authenticates using `DefaultAzureCredential` and processes events in a simple `on_event` callback, updating the checkpoint after each event. Ensure environment variables for Event Hubs connection details and Blob Storage account URL/container name are set.

import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

# Environment variables for Event Hubs
EVENTHUB_CONNECTION_STR = os.environ.get('EVENTHUB_CONNECTION_STR', '')
EVENTHUB_NAME = os.environ.get('EVENTHUB_NAME', '')
CONSUMER_GROUP = os.environ.get('CONSUMER_GROUP', '$Default')

# Environment variables for Azure Blob Storage Checkpoint Store
BLOB_STORAGE_ACCOUNT_URL = os.environ.get('BLOB_STORAGE_ACCOUNT_URL', '') # e.g., 'https://<your_storage_account_name>.blob.core.windows.net/'
BLOB_CONTAINER_NAME = os.environ.get('BLOB_CONTAINER_NAME', '')

# DefaultAzureCredential will attempt to authenticate using environment variables,
# managed identity, Azure CLI, etc.
credential = DefaultAzureCredential()

def on_event(partition_context, event):
    print(f"Received event from partition {partition_context.partition_id}, sequence_number: {event.sequence_number}")
    # Your event processing logic here

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    partition_context.update_checkpoint(event)

def main():
    if not all([EVENTHUB_CONNECTION_STR, EVENTHUB_NAME, BLOB_STORAGE_ACCOUNT_URL, BLOB_CONTAINER_NAME]):
        print("Please set EVENTHUB_CONNECTION_STR, EVENTHUB_NAME, BLOB_STORAGE_ACCOUNT_URL, and BLOB_CONTAINER_NAME environment variables.")
        return

    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore(
        blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
        container_name=BLOB_CONTAINER_NAME,
        credential=credential,
    )

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(
        conn_str=EVENTHUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store,
    )

    with client:
        print(f"Listening for events in consumer group '{CONSUMER_GROUP}' on Event Hub '{EVENTHUB_NAME}'...")
        # Call the receive method. Read from the beginning of the partition
        client.receive(on_event=on_event, starting_position="-1")

    credential.close()

if __name__ == "__main__":
    main()

view raw JSON →