Durable Task SDK for Python
The `durabletask` library is a Python Client SDK for the Azure Durable Task Scheduler, enabling developers to define, schedule, and manage resilient and stateful workflows (orchestrations) using ordinary Python code. It is designed for building fault-tolerant, long-running processes. The current version is 1.4.0, and the project maintains an active release cadence.
Common errors
-
ModuleNotFoundError: No module named 'durabletask'
cause The `durabletask` package is not installed in the current Python environment or the environment is not activated.fixRun `pip install durabletask` to install the package. Ensure your virtual environment is activated if you are using one. -
Orchestration is stuck in the 'Running' state
cause This typically means the orchestration is waiting for a task (activity or sub-orchestration) that has not completed. Common reasons include an activity not being registered with the worker, waiting for an unraised external event, or an activity throwing an unhandled exception.fixCheck the Durable Task Scheduler dashboard (usually on port 8082 for the emulator) for errors. Verify that all activities called by the orchestrator are correctly registered with the `DurableTaskWorker`. Ensure external events are being raised, and activities are not silently failing. -
TypeError: 'coroutine' object is not callable (or similar async/await issues)
cause Python Durable Task SDK heavily relies on `asyncio` and `await`. This error often occurs when an `async` function is called without `await` or when `asyncio.run()` is not used correctly to start the event loop.fixEnsure all calls to `async` functions within orchestrators, activities, and the main execution block are preceded by `await`. Confirm that your main application entry point uses `asyncio.run()` to initiate the asynchronous flow. -
DurableTask.Netherite.AzureFunctions: Could not load file or assembly 'Microsoft.Azure.WebJobs.Extensions.DurableTask, Version=...' (or similar assembly loading error)
cause This error points to an issue with Durable Functions extensions, often related to how extension bundles are cached or loaded, typically in Azure Functions environments when the Durable Task SDK is mistakenly used or conflicting dependencies exist.fixThis specific error usually indicates a conflict or misconfiguration with Azure Durable Functions rather than the standalone Durable Task SDK. If you are building a Durable Function, ensure you are using the correct Durable Functions SDK (`azure-functions-durable`) and that your `host.json` has the correct extension bundle version. If using the Durable Task SDK, ensure it's not being deployed into a Durable Functions runtime environment unless explicitly supported via custom host.
Warnings
- breaking Python 3.9 support was removed in version 1.3.0. The library now requires Python 3.10 or newer.
- gotcha Orchestrator functions must be deterministic. Avoid direct use of non-deterministic operations like `datetime.datetime.now()` or `random.random()` within orchestrator code, as this can lead to replay mismatches and unexpected behavior. Instead, use context-provided deterministic alternatives.
- gotcha When using `context.task_all` (or `Task.all`) to run multiple activities in parallel, the orchestration will fail immediately upon the *first* activity's failure, even if other activities might succeed. Error details are encapsulated in `TaskFailedException`.
- gotcha This SDK is *not* directly compatible with Azure Durable Functions. If you are building Durable Functions, use the `azure-functions-durable` package and its associated tools. This SDK targets the standalone Durable Task Scheduler.
- gotcha Incorrect connection string formats are a common cause of startup failures, especially when switching between local development (emulator) and Azure deployments.
Install
-
pip install durabletask
Imports
- DurableTaskClient
from durabletask import DurableTaskClient
- DurableTaskWorker
from durabletask import DurableTaskWorker
- OrchestrationContext
from durabletask import OrchestrationContext
- Task
from durabletask import Task
- TaskActivity
from durabletask import TaskActivity
Quickstart
import os
from durabletask import DurableTaskClient, DurableTaskWorker, OrchestrationContext, Task, TaskActivity
# NOTE: For local development, ensure the Durable Task Scheduler emulator is running.
# For example, using Docker: docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
CONNECTION_STRING = os.environ.get("DURABLETASK_CONNECTION_STRING", "Endpoint=http://localhost:8080;Authentication=None")
TASK_HUB_NAME = os.environ.get("DURABLETASK_TASK_HUB_NAME", "default")
class HelloActivity(TaskActivity):
async def run(self, context: OrchestrationContext, input: str) -> str:
print(f"Executing HelloActivity with input: {input}")
return f"Hello, {input}!"
class HelloOrchestrator(TaskActivity):
async def run(self, context: OrchestrationContext, input: str) -> str:
print(f"Starting HelloOrchestrator with input: {input}")
# Call an activity function
result = await context.call_activity("HelloActivity", input)
print(f"Orchestrator received result: {result}")
return result
async def main():
client = DurableTaskClient(CONNECTION_STRING, TASK_HUB_NAME)
# Register orchestrators and activities with the worker
worker = DurableTaskWorker(
CONNECTION_STRING,
TASK_HUB_NAME,
orchestrators={
"HelloOrchestrator": HelloOrchestrator()
},
activities={
"HelloActivity": HelloActivity()
}
)
async with worker:
print("Worker started. Starting orchestration...")
# Start a new orchestration
instance_id = await client.schedule_new_orchestration("HelloOrchestrator", "World")
print(f"Orchestration instance started: {instance_id}")
# Wait for the orchestration to complete
status = await client.wait_for_orchestration_completion(instance_id, timeout_in_seconds=60)
if status:
print(f"Orchestration '{instance_id}' completed. Status: {status.runtime_status}, Output: {status.output}")
else:
print(f"Orchestration '{instance_id}' did not complete within the timeout.")
if __name__ == "__main__":
import asyncio
asyncio.run(main())