Durable Task for Microsoft Agent Framework

raw JSON →
1.0.0b260409 verified Thu Apr 16 auth: no python

The `agent-framework-durabletask` library provides robust integration for orchestrating long-running agent operations within the Microsoft Agent Framework, leveraging the capabilities of Durable Task (often via Azure Durable Functions). It enables complex, stateful workflows for agent-based systems. The current version is `1.0.0b260409`, indicating a beta phase with potentially frequent updates as it evolves alongside the core `agent-framework` library.

pip install agent-framework-durabletask
error ModuleNotFoundError: No module named 'agent_framework_durabletask'
cause The library is not installed or the Python environment is not configured correctly.
fix
Ensure the library is installed using pip install agent-framework-durabletask in the active virtual environment.
error Orchestrator function 'MyOrchestrator' failed due to a non-deterministic operation. See the function log for more details.
cause The orchestrator code contains operations that are not deterministic across replays, which is forbidden in Durable Task.
fix
Refactor the orchestrator function to ensure all operations are deterministic. Use DurableTaskOrchestrationContext for time, random numbers, or external calls. Delegate non-deterministic logic to 'activity' functions.
error azure.core.exceptions.ClientAuthenticationError: Authentication failed.
cause The `AZURE_STORAGE_CONNECTION_STRING` provided for `AzureDurableTaskClient` is invalid, expired, or lacks necessary permissions.
fix
Verify that the AZURE_STORAGE_CONNECTION_STRING environment variable or constructor argument contains a valid connection string for an Azure Storage Account. Ensure the account has permissions for queue and table access.
error ValueError: The 'azure_storage_connection_string' must be provided.
cause The `AzureDurableTaskClient` was instantiated without the required Azure Storage connection string.
fix
Ensure the AZURE_STORAGE_CONNECTION_STRING environment variable is set or pass the connection string directly to the AzureDurableTaskClient constructor.
breaking The library is currently in a beta (`1.0.0b...`) state, which means its API is subject to change without strict backward compatibility guarantees between minor or even patch versions. Expect breaking changes.
fix Always pin to a specific version (`agent-framework-durabletask==X.Y.Z`) and thoroughly test updates. Monitor the official GitHub repository for release notes.
gotcha Orchestrator functions must be deterministic. Any non-deterministic operations (e.g., direct I/O, generating random numbers, using `datetime.now()` without `context.current_utc_datetime`) can lead to replay issues, where the orchestration behaves differently on replay, causing runtime errors.
fix Use `DurableTaskOrchestrationContext` methods for all side-effecting operations (e.g., `context.call_activity`, `context.current_utc_datetime`). Avoid any non-deterministic code paths within the orchestrator function itself.
gotcha Using `AzureDurableTaskClient` (the most common client) requires an Azure Storage Account and an Azure Functions app configured to host Durable Functions. Without this backend, the client cannot communicate with an orchestrator runtime.
fix Set up an Azure Storage Account and an Azure Functions app with Durable Functions enabled. Provide the storage account's connection string via the `AZURE_STORAGE_CONNECTION_STRING` environment variable or directly to the client constructor.
deprecated While not explicitly deprecated yet, reliance on older versions of `azure-functions-durable` or `azure-storage-queue` can lead to compatibility issues with newer Azure services or Python runtimes. Always ensure these dependencies are up-to-date.
fix Regularly update `agent-framework-durabletask` and its core `azure-*` dependencies. Check PyPI for the latest compatible versions.

This quickstart demonstrates how to define a `DurableTaskOrchestrator` and initialize an `AzureDurableTaskClient`. To make the client fully functional and execute orchestrations, you must set the `AZURE_STORAGE_CONNECTION_STRING` environment variable, pointing to an Azure Storage Account that backs an Azure Functions app running Durable Functions.

import os
from agent_framework_durabletask.orchestration import (
    DurableTaskOrchestrator,
    DurableTaskOrchestrationContext,
)
from agent_framework_durabletask.client import DurableTaskClient
from agent_framework_durabletask.azure import AzureDurableTaskClient

# 1. Define a Durable Task Orchestrator
class MySimpleOrchestrator(DurableTaskOrchestrator):
    """
    A simple orchestrator that logs its input and returns a processed string.
    In a real scenario, this would coordinate calls to 'activities'.
    """
    async def orchestrate(self, context: DurableTaskOrchestrationContext, input_data: str):
        print(f"Orchestrator '{context.instance_id}' received input: '{input_data}'")
        # Simulate some async work by returning immediately for this example
        return f"Processed '{input_data}' at {context.current_utc_datetime}"

# 2. Instantiate a Durable Task Client (e.g., for Azure Durable Functions)
# This client requires connection details to an Azure Storage Account
# which is used by Azure Durable Functions to manage orchestration state.
azure_storage_connection_string = os.environ.get("AZURE_STORAGE_CONNECTION_STRING", "")
task_hub_name = os.environ.get("DURABLETASK_HUB_NAME", "DefaultTaskHub")

if azure_storage_connection_string:
    print("Initializing AzureDurableTaskClient...")
    try:
        client = AzureDurableTaskClient(
            task_hub_name=task_hub_name,
            azure_storage_connection_string=azure_storage_connection_string
        )
        print(f"AzureDurableTaskClient initialized for task hub '{task_hub_name}'.")
        # In a real application, you would then use `client` to start and manage orchestrations:
        # import asyncio
        # async def run_orchestration():
        #     instance_id = await client.start_orchestration(MySimpleOrchestrator, "Hello DurableTask!")
        #     print(f"Started orchestration with ID: {instance_id}")
        #     # You'd typically poll or wait for the orchestration to complete
        #     # status = await client.get_orchestration_status(instance_id)
        #     # while status.runtime_status not in [OrchestrationRuntimeStatus.Completed, OrchestrationRuntimeStatus.Failed, OrchestrationRuntimeStatus.Terminated]:
        #     #     await asyncio.sleep(5)
        #     #     status = await client.get_orchestration_status(instance_id)
        #     # print(f"Orchestration {instance_id} finished with status: {status.runtime_status}")
        # asyncio.run(run_orchestration())
    except Exception as e:
        print(f"Failed to initialize AzureDurableTaskClient: {e}")
        print("Please ensure AZURE_STORAGE_CONNECTION_STRING is valid and points to an Azure Storage Account.")
else:
    print("AZURE_STORAGE_CONNECTION_STRING environment variable not set.")
    print("Cannot initialize AzureDurableTaskClient without storage connection details.")
    print("To run this, ensure you have an Azure Storage Account and set its connection string.")

print("\n--- Example finished. This code defines an orchestrator and attempts to initialize a client. ---")
print("To execute orchestrations, you need a Durable Task backend host (e.g., Azure Functions).")