Durable Task for Microsoft Agent Framework
The `agent-framework-durabletask` library provides robust integration for orchestrating long-running agent operations within the Microsoft Agent Framework, leveraging the capabilities of Durable Task (often via Azure Durable Functions). It enables complex, stateful workflows for agent-based systems. The current version is `1.0.0b260409`, indicating a beta phase with potentially frequent updates as it evolves alongside the core `agent-framework` library.
Common errors
-
ModuleNotFoundError: No module named 'agent_framework_durabletask'
cause The library is not installed or the Python environment is not configured correctly.fixEnsure the library is installed using `pip install agent-framework-durabletask` in the active virtual environment. -
Orchestrator function 'MyOrchestrator' failed due to a non-deterministic operation. See the function log for more details.
cause The orchestrator code contains operations that are not deterministic across replays, which is forbidden in Durable Task.fixRefactor the orchestrator function to ensure all operations are deterministic. Use `DurableTaskOrchestrationContext` for time, random numbers, or external calls. Delegate non-deterministic logic to 'activity' functions. -
azure.core.exceptions.ClientAuthenticationError: Authentication failed.
cause The `AZURE_STORAGE_CONNECTION_STRING` provided for `AzureDurableTaskClient` is invalid, expired, or lacks necessary permissions.fixVerify that the `AZURE_STORAGE_CONNECTION_STRING` environment variable or constructor argument contains a valid connection string for an Azure Storage Account. Ensure the account has permissions for queue and table access. -
ValueError: The 'azure_storage_connection_string' must be provided.
cause The `AzureDurableTaskClient` was instantiated without the required Azure Storage connection string.fixEnsure the `AZURE_STORAGE_CONNECTION_STRING` environment variable is set or pass the connection string directly to the `AzureDurableTaskClient` constructor.
Warnings
- breaking The library is currently in a beta (`1.0.0b...`) state, which means its API is subject to change without strict backward compatibility guarantees between minor or even patch versions. Expect breaking changes.
- gotcha Orchestrator functions must be deterministic. Any non-deterministic operations (e.g., direct I/O, generating random numbers, using `datetime.now()` without `context.current_utc_datetime`) can lead to replay issues, where the orchestration behaves differently on replay, causing runtime errors.
- gotcha Using `AzureDurableTaskClient` (the most common client) requires an Azure Storage Account and an Azure Functions app configured to host Durable Functions. Without this backend, the client cannot communicate with an orchestrator runtime.
- deprecated While not explicitly deprecated yet, reliance on older versions of `azure-functions-durable` or `azure-storage-queue` can lead to compatibility issues with newer Azure services or Python runtimes. Always ensure these dependencies are up-to-date.
Install
-
pip install agent-framework-durabletask
Imports
- DurableTaskOrchestrator
from agent_framework_durabletask.orchestration import DurableTaskOrchestrator
- DurableTaskOrchestrationContext
from agent_framework_durabletask.orchestration import DurableTaskOrchestrationContext
- DurableTaskClient
from agent_framework_durabletask.client import DurableTaskClient
- AzureDurableTaskClient
from agent_framework_durabletask.azure import AzureDurableTaskClient
Quickstart
import os
from agent_framework_durabletask.orchestration import (
DurableTaskOrchestrator,
DurableTaskOrchestrationContext,
)
from agent_framework_durabletask.client import DurableTaskClient
from agent_framework_durabletask.azure import AzureDurableTaskClient
# 1. Define a Durable Task Orchestrator
class MySimpleOrchestrator(DurableTaskOrchestrator):
"""
A simple orchestrator that logs its input and returns a processed string.
In a real scenario, this would coordinate calls to 'activities'.
"""
async def orchestrate(self, context: DurableTaskOrchestrationContext, input_data: str):
print(f"Orchestrator '{context.instance_id}' received input: '{input_data}'")
# Simulate some async work by returning immediately for this example
return f"Processed '{input_data}' at {context.current_utc_datetime}"
# 2. Instantiate a Durable Task Client (e.g., for Azure Durable Functions)
# This client requires connection details to an Azure Storage Account
# which is used by Azure Durable Functions to manage orchestration state.
azure_storage_connection_string = os.environ.get("AZURE_STORAGE_CONNECTION_STRING", "")
task_hub_name = os.environ.get("DURABLETASK_HUB_NAME", "DefaultTaskHub")
if azure_storage_connection_string:
print("Initializing AzureDurableTaskClient...")
try:
client = AzureDurableTaskClient(
task_hub_name=task_hub_name,
azure_storage_connection_string=azure_storage_connection_string
)
print(f"AzureDurableTaskClient initialized for task hub '{task_hub_name}'.")
# In a real application, you would then use `client` to start and manage orchestrations:
# import asyncio
# async def run_orchestration():
# instance_id = await client.start_orchestration(MySimpleOrchestrator, "Hello DurableTask!")
# print(f"Started orchestration with ID: {instance_id}")
# # You'd typically poll or wait for the orchestration to complete
# # status = await client.get_orchestration_status(instance_id)
# # while status.runtime_status not in [OrchestrationRuntimeStatus.Completed, OrchestrationRuntimeStatus.Failed, OrchestrationRuntimeStatus.Terminated]:
# # await asyncio.sleep(5)
# # status = await client.get_orchestration_status(instance_id)
# # print(f"Orchestration {instance_id} finished with status: {status.runtime_status}")
# asyncio.run(run_orchestration())
except Exception as e:
print(f"Failed to initialize AzureDurableTaskClient: {e}")
print("Please ensure AZURE_STORAGE_CONNECTION_STRING is valid and points to an Azure Storage Account.")
else:
print("AZURE_STORAGE_CONNECTION_STRING environment variable not set.")
print("Cannot initialize AzureDurableTaskClient without storage connection details.")
print("To run this, ensure you have an Azure Storage Account and set its connection string.")
print("\n--- Example finished. This code defines an orchestrator and attempts to initialize a client. ---")
print("To execute orchestrations, you need a Durable Task backend host (e.g., Azure Functions).")