Durable Task SDK for Python

1.4.0 · active · verified Thu Apr 16

The `durabletask` library is a Python Client SDK for the Azure Durable Task Scheduler, enabling developers to define, schedule, and manage resilient and stateful workflows (orchestrations) using ordinary Python code. It is designed for building fault-tolerant, long-running processes. The current version is 1.4.0, and the project maintains an active release cadence.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple 'Hello World' orchestration. It defines an activity (`HelloActivity`) and an orchestrator (`HelloOrchestrator`), registers them with a worker, and then schedules a new orchestration instance using the client. It assumes a Durable Task Scheduler emulator is running locally on `http://localhost:8080` (e.g., via Docker) and uses environment variables for configuration.

import os
from durabletask import DurableTaskClient, DurableTaskWorker, OrchestrationContext, Task, TaskActivity

# NOTE: For local development, ensure the Durable Task Scheduler emulator is running.
# For example, using Docker: docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
CONNECTION_STRING = os.environ.get("DURABLETASK_CONNECTION_STRING", "Endpoint=http://localhost:8080;Authentication=None")
TASK_HUB_NAME = os.environ.get("DURABLETASK_TASK_HUB_NAME", "default")

class HelloActivity(TaskActivity):
    async def run(self, context: OrchestrationContext, input: str) -> str:
        print(f"Executing HelloActivity with input: {input}")
        return f"Hello, {input}!"

class HelloOrchestrator(TaskActivity):
    async def run(self, context: OrchestrationContext, input: str) -> str:
        print(f"Starting HelloOrchestrator with input: {input}")
        # Call an activity function
        result = await context.call_activity("HelloActivity", input)
        print(f"Orchestrator received result: {result}")
        return result

async def main():
    client = DurableTaskClient(CONNECTION_STRING, TASK_HUB_NAME)
    
    # Register orchestrators and activities with the worker
    worker = DurableTaskWorker(
        CONNECTION_STRING,
        TASK_HUB_NAME,
        orchestrators={
            "HelloOrchestrator": HelloOrchestrator()
        },
        activities={
            "HelloActivity": HelloActivity()
        }
    )
    
    async with worker:
        print("Worker started. Starting orchestration...")
        # Start a new orchestration
        instance_id = await client.schedule_new_orchestration("HelloOrchestrator", "World")
        print(f"Orchestration instance started: {instance_id}")

        # Wait for the orchestration to complete
        status = await client.wait_for_orchestration_completion(instance_id, timeout_in_seconds=60)
        if status:
            print(f"Orchestration '{instance_id}' completed. Status: {status.runtime_status}, Output: {status.output}")
        else:
            print(f"Orchestration '{instance_id}' did not complete within the timeout.")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

view raw JSON →