DBOS Transact Python Library

2.18.0 · active · verified Wed Apr 15

DBOS provides an ultra-lightweight Python library for durable execution, enabling fault-tolerant workflows and queues built on PostgreSQL (or SQLite by default). It allows developers to add resumable execution to applications using simple function annotations, eliminating the need for separate workflow orchestrators or task queue systems. As of version 2.18.0, it offers features like exactly-once execution, scheduled jobs, and built-in observability. The library is actively maintained with frequent updates and is suitable for building reliable backend services, data pipelines, and AI agents.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic durable workflow using DBOS. It defines a `step` function and a `workflow` function, both decorated with DBOS annotations. The `DBOS` instance is initialized, optionally configured with a PostgreSQL connection string via the `DBOS_SYSTEM_DATABASE_URL` environment variable (defaults to SQLite). The `launch()` method registers the decorated functions, and `start()` initiates a workflow. DBOS automatically checkpoints the workflow's state, allowing it to recover from failures and resume from the last completed step upon restart.

import os
import time
from dbos import DBOS

# Configure DBOS to use SQLite by default, or PostgreSQL if env var is set
# For production, PostgreSQL is recommended. SQLite is good for local development.
dbos_system_database_url = os.environ.get(
    'DBOS_SYSTEM_DATABASE_URL',
    'sqlite:///dbos_system.db'
)

# Initialize DBOS (this should typically happen once at application startup)
dbos_instance = DBOS(system_database_url=dbos_system_database_url)

@dbos_instance.step()
def greet_step(name: str) -> str:
    """A simple durable step that returns a greeting."""
    print(f"Executing greet_step for {name}...")
    time.sleep(0.5) # Simulate some work
    return f"Hello, {name}!"

@dbos_instance.workflow()
def greeting_workflow(person_name: str) -> str:
    """A durable workflow composed of a single step."""
    print(f"Starting greeting_workflow for {person_name}...")
    result = greet_step(person_name)
    print(f"Greeting workflow finished with: {result}")
    return result

if __name__ == "__main__":
    print(f"DBOS System Database: {dbos_system_database_url}")
    try:
        # Launch the DBOS application. This will discover and register workflows.
        dbos_instance.launch()

        # Start a workflow. DBOS ensures it will complete reliably.
        workflow_handle = greeting_workflow.start(person_name="World")

        # You can optionally wait for the workflow to complete and get its result
        final_result = workflow_handle.get_result()
        print(f"Workflow 'greeting_workflow' completed with result: {final_result}")

        # Demonstrate recovery: if you crash the app here, and restart, the workflow will resume
        # For example, comment out the line above and uncomment the sleep and crash below, then restart.
        # print("Sleeping for 10 seconds. Try to crash the app (Ctrl+C or kill) and restart it!")
        # time.sleep(10)
        # workflow_handle = greeting_workflow.start(person_name="RecoveredUser") # This will start a new one, but if the previous was pending, it would resume.

    except KeyboardInterrupt:
        print("Application interrupted.")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Properly shut down DBOS resources
        dbos_instance.destroy()
        print("DBOS application shut down.")

view raw JSON →