DBOS Transact Python Library
DBOS provides an ultra-lightweight Python library for durable execution, enabling fault-tolerant workflows and queues built on PostgreSQL (or SQLite by default). It allows developers to add resumable execution to applications using simple function annotations, eliminating the need for separate workflow orchestrators or task queue systems. As of version 2.18.0, it offers features like exactly-once execution, scheduled jobs, and built-in observability. The library is actively maintained with frequent updates and is suitable for building reliable backend services, data pipelines, and AI agents.
Warnings
- breaking Modifying the sequence or number of steps within an active workflow (a 'breaking change') can cause recovery failures for in-progress workflows. DBOS checkpoints the state of workflows in the database based on the expected flow.
- gotcha DBOS workflows must be deterministic: given the same inputs, they must always invoke the same steps in the same order with the same inputs and expect the same return values from those steps. Non-deterministic workflows can lead to incorrect recovery behavior and `DBOSUnexpectedStepError`s.
- gotcha Embedding synchronous (blocking) calls in Python or TypeScript asynchronous DBOS applications can block the event loop, preventing workflows and other async operations from making progress and leading to 'stuck' workflows.
- gotcha When using DBOS's retry mechanisms for steps that interact with external APIs, it's recommended to disable any built-in retry logic in the API client itself to avoid excessive or conflicting retries. For example, set `max_retries=0` for `OpenAIProvider` clients.
- gotcha Workflow inputs/outputs and step outputs are checkpointed using Python's `pickle` module. Ensure that all data passed through workflows or returned by steps is pickle-serializable. Large objects may also impact performance due to database write overhead.
- gotcha DBOS requires the system database to be available at application startup. If the configured database is unreachable during initialization, DBOS will exit rather than waiting for the connection to be established.
- gotcha Applying multiple DBOS decorators (e.g., `@DBOS.workflow()` and `@DBOS.transaction()`) to the same function or registering functions with conflicting names/types will raise a `DBOSConflictingRegistrationError`.
Install
-
pip install dbos -
pip install dbos[otel]
Imports
- DBOS
from dbos import DBOS
Quickstart
import os
import time
from dbos import DBOS
# Configure DBOS to use SQLite by default, or PostgreSQL if env var is set
# For production, PostgreSQL is recommended. SQLite is good for local development.
dbos_system_database_url = os.environ.get(
'DBOS_SYSTEM_DATABASE_URL',
'sqlite:///dbos_system.db'
)
# Initialize DBOS (this should typically happen once at application startup)
dbos_instance = DBOS(system_database_url=dbos_system_database_url)
@dbos_instance.step()
def greet_step(name: str) -> str:
"""A simple durable step that returns a greeting."""
print(f"Executing greet_step for {name}...")
time.sleep(0.5) # Simulate some work
return f"Hello, {name}!"
@dbos_instance.workflow()
def greeting_workflow(person_name: str) -> str:
"""A durable workflow composed of a single step."""
print(f"Starting greeting_workflow for {person_name}...")
result = greet_step(person_name)
print(f"Greeting workflow finished with: {result}")
return result
if __name__ == "__main__":
print(f"DBOS System Database: {dbos_system_database_url}")
try:
# Launch the DBOS application. This will discover and register workflows.
dbos_instance.launch()
# Start a workflow. DBOS ensures it will complete reliably.
workflow_handle = greeting_workflow.start(person_name="World")
# You can optionally wait for the workflow to complete and get its result
final_result = workflow_handle.get_result()
print(f"Workflow 'greeting_workflow' completed with result: {final_result}")
# Demonstrate recovery: if you crash the app here, and restart, the workflow will resume
# For example, comment out the line above and uncomment the sleep and crash below, then restart.
# print("Sleeping for 10 seconds. Try to crash the app (Ctrl+C or kill) and restart it!")
# time.sleep(10)
# workflow_handle = greeting_workflow.start(person_name="RecoveredUser") # This will start a new one, but if the previous was pending, it would resume.
except KeyboardInterrupt:
print("Application interrupted.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Properly shut down DBOS resources
dbos_instance.destroy()
print("DBOS application shut down.")