Durable Task Azure Managed Provider
`durabletask-azuremanaged` is a Python library that provides an implementation for the `durabletask` SDK, enabling Python applications to leverage Azure's Durable Task Scheduler. It allows developers to define, run, and manage long-running, stateful workflows and orchestrations on Azure by integrating with its managed infrastructure for reliable task execution. As of version 1.4.0, it aligns with the `durabletask-py` SDK, typically seeing new releases in conjunction with the broader SDK's development.
Common errors
-
ModuleNotFoundError: No module named 'durabletask_azuremanaged'
cause The `durabletask-azuremanaged` library is not installed in the current Python environment.fixInstall the library using pip: `pip install durabletask-azuremanaged`. -
durabletask.exceptions.TaskHubError: Failed to connect to Azure Durable Task Hub. Please check connection string and task hub name.
cause The provided Azure connection string is invalid, the `task_hub_name` is incorrect, or there are network/firewall issues preventing access to the Azure Durable Task backend.fixVerify the `AZURE_DURABLETASK_CONNECTION_STRING` and `AZURE_DURABLETASK_HUB_NAME` environment variables or directly provided values. Ensure the connection string grants appropriate permissions and your network allows outbound connections to Azure. -
TypeError: Object of type <YourCustomClass> is not JSON serializable
cause Inputs and outputs for orchestrations and activities in Durable Task must be JSON-serializable. Custom Python objects are not automatically handled.fixEnsure all data passed to and from orchestrators and activities consists of JSON-serializable types (e.g., strings, numbers, lists, dictionaries, booleans, None). Convert custom objects to a serializable format (like a dict) before passing them.
Warnings
- breaking The `durabletask` SDK underwent significant breaking changes leading up to its `1.0.0` release. `durabletask-azuremanaged` versions 1.x are specifically designed for `durabletask` 1.x and are not compatible with older `durabletask` 0.x SDK versions.
- gotcha The `task_hub_name` provided to `AzureManagedTaskHub` must be globally unique within a specific Azure region if using public endpoints. Reusing names can lead to conflicts, unexpected behavior, or data corruption.
- gotcha Orchestrations and activities will not execute or progress if a `durabletask.worker.Worker` instance configured with the same `AzureManagedTaskHub` is not actively running and connected.
- gotcha Azure connection strings for the Durable Task Hub typically include shared access keys. Managing these securely, for example, via Azure Key Vault or environment variables, is crucial. Hardcoding them is a security risk.
Install
-
pip install durabletask-azuremanaged
Imports
- AzureManagedTaskHub
from durabletask_azuremanaged.azure_managed_task_hub import AzureManagedTaskHub
Quickstart
import asyncio
import os
from durabletask.client import TaskHubClient
from durabletask.orchestration import OrchestrationContext, orchestrator
from durabletask.worker import Worker
from durabletask_azuremanaged.azure_managed_task_hub import AzureManagedTaskHub
async def run_orchestration_sample():
# Configure Azure Managed Task Hub with connection string and task hub name
# AZURE_DURABLETASK_CONNECTION_STRING: Primary/Secondary connection string
# from the Azure Durable Task Hub resource.
# AZURE_DURABLETASK_HUB_NAME: A globally unique name for your task hub within the region.
connection_string = os.environ.get("AZURE_DURABLETASK_CONNECTION_STRING", "")
task_hub_name = os.environ.get("AZURE_DURABLETASK_HUB_NAME", "MyPythonTaskHub")
if not connection_string:
print("Please set the AZURE_DURABLETASK_CONNECTION_STRING environment variable.")
return
# 1. Initialize the Azure Managed Task Hub backend
task_hub = AzureManagedTaskHub(
connection_string=connection_string,
task_hub_name=task_hub_name
)
# 2. Define an orchestrator function
@orchestrator
async def my_orchestrator(context: OrchestrationContext, input_value: str):
print(f"Orchestration '{context.instance_id}' started with input: {input_value}")
result = await context.call_activity("my_activity", input_value)
print(f"Activity returned: {result}")
return f"Orchestration completed with result: {result}"
# 3. Define an activity function
async def my_activity(context: OrchestrationContext, value: str):
print(f"Activity '{context.instance_id}' received: {value}")
await asyncio.sleep(1) # Simulate some work
return f"Processed: {value.upper()}"
# 4. Initialize the Worker and register orchestrator/activity
worker = Worker(task_hub)
worker.add_orchestrator(my_orchestrator)
worker.add_activity("my_activity", my_activity)
# 5. Initialize the TaskHubClient for scheduling orchestrations
client = TaskHubClient(task_hub)
# 6. Start the worker in the background (essential for processing tasks)
worker_task = asyncio.create_task(worker.run())
try:
# 7. Schedule a new orchestration
print("Scheduling new orchestration...")
instance_id = await client.schedule_new_orchestration(my_orchestrator, "Hello DurableTask!")
print(f"Orchestration instance ID: {instance_id}")
# 8. Wait for the orchestration to complete
status = await client.wait_for_completion(instance_id, timeout=30)
print(f"\nOrchestration '{instance_id}' completed with status: {status.runtime_status}")
print(f"Output: {status.output}")
finally:
# 9. Clean up: Shut down the worker
print("\nShutting down worker...")
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass # Expected when cancelling
await worker.shutdown()
if __name__ == "__main__":
asyncio.run(run_orchestration_sample())