Durable Task Azure Managed Provider

1.4.0 · active · verified Thu Apr 16

`durabletask-azuremanaged` is a Python library that provides an implementation for the `durabletask` SDK, enabling Python applications to leverage Azure's Durable Task Scheduler. It allows developers to define, run, and manage long-running, stateful workflows and orchestrations on Azure by integrating with its managed infrastructure for reliable task execution. As of version 1.4.0, it aligns with the `durabletask-py` SDK, typically seeing new releases in conjunction with the broader SDK's development.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up `durabletask-azuremanaged` by configuring an `AzureManagedTaskHub` using environment variables. It then defines a simple orchestrator and activity, initializes a `Worker` to process them, and uses a `TaskHubClient` to schedule and monitor an orchestration. Ensure `AZURE_DURABLETASK_CONNECTION_STRING` and `AZURE_DURABLETASK_HUB_NAME` are set in your environment before running.

import asyncio
import os
from durabletask.client import TaskHubClient
from durabletask.orchestration import OrchestrationContext, orchestrator
from durabletask.worker import Worker
from durabletask_azuremanaged.azure_managed_task_hub import AzureManagedTaskHub

async def run_orchestration_sample():
    # Configure Azure Managed Task Hub with connection string and task hub name
    # AZURE_DURABLETASK_CONNECTION_STRING: Primary/Secondary connection string
    #     from the Azure Durable Task Hub resource.
    # AZURE_DURABLETASK_HUB_NAME: A globally unique name for your task hub within the region.
    connection_string = os.environ.get("AZURE_DURABLETASK_CONNECTION_STRING", "")
    task_hub_name = os.environ.get("AZURE_DURABLETASK_HUB_NAME", "MyPythonTaskHub")

    if not connection_string:
        print("Please set the AZURE_DURABLETASK_CONNECTION_STRING environment variable.")
        return

    # 1. Initialize the Azure Managed Task Hub backend
    task_hub = AzureManagedTaskHub(
        connection_string=connection_string,
        task_hub_name=task_hub_name
    )

    # 2. Define an orchestrator function
    @orchestrator
    async def my_orchestrator(context: OrchestrationContext, input_value: str):
        print(f"Orchestration '{context.instance_id}' started with input: {input_value}")
        result = await context.call_activity("my_activity", input_value)
        print(f"Activity returned: {result}")
        return f"Orchestration completed with result: {result}"

    # 3. Define an activity function
    async def my_activity(context: OrchestrationContext, value: str):
        print(f"Activity '{context.instance_id}' received: {value}")
        await asyncio.sleep(1) # Simulate some work
        return f"Processed: {value.upper()}"

    # 4. Initialize the Worker and register orchestrator/activity
    worker = Worker(task_hub)
    worker.add_orchestrator(my_orchestrator)
    worker.add_activity("my_activity", my_activity)

    # 5. Initialize the TaskHubClient for scheduling orchestrations
    client = TaskHubClient(task_hub)

    # 6. Start the worker in the background (essential for processing tasks)
    worker_task = asyncio.create_task(worker.run())

    try:
        # 7. Schedule a new orchestration
        print("Scheduling new orchestration...")
        instance_id = await client.schedule_new_orchestration(my_orchestrator, "Hello DurableTask!")
        print(f"Orchestration instance ID: {instance_id}")

        # 8. Wait for the orchestration to complete
        status = await client.wait_for_completion(instance_id, timeout=30)
        print(f"\nOrchestration '{instance_id}' completed with status: {status.runtime_status}")
        print(f"Output: {status.output}")

    finally:
        # 9. Clean up: Shut down the worker
        print("\nShutting down worker...")
        worker_task.cancel()
        try:
            await worker_task
        except asyncio.CancelledError:
            pass # Expected when cancelling
        await worker.shutdown()

if __name__ == "__main__":
    asyncio.run(run_orchestration_sample())

view raw JSON →