Pgqueuer: PostgreSQL-backed Job Queuing and Scheduling
Pgqueuer is a Python library that utilizes PostgreSQL as a robust and efficient backend for asynchronous job queuing and scheduling. It supports both one-off jobs and recurring tasks via cron-like schedules, offering an in-memory driver for development and testing. The library is actively developed, with frequent releases bringing architectural improvements, new features, and bug fixes. The current version is 0.26.3.
Common errors
-
asyncpg.exceptions.InvalidCatalogNameError: database "non_existent_db" does not exist
cause The database name specified in the `DATABASE_URL` does not exist on the PostgreSQL server.fixCreate the specified database on your PostgreSQL server or correct the database name in your `DATABASE_URL`. -
asyncpg.exceptions.CannotConnectNowError: could not connect to server: Connection refused
cause Pgqueuer could not establish a connection to the PostgreSQL server. This typically means the server is not running, is inaccessible from where Pgqueuer is running, or the host/port in the `DATABASE_URL` is incorrect.fixEnsure your PostgreSQL server is running, listening on the correct host/port, and is accessible (e.g., firewall rules). Double-check the host and port in your `DATABASE_URL`. -
TypeError: 'function' object is not awaitable
cause You registered a synchronous Python function with `pg.worker.register_task()` but Pgqueuer expects an `async def` function that can be awaited.fixChange your task function definition from `def my_task(...)` to `async def my_task(...)` and ensure any internal asynchronous calls use `await`. -
KeyError: 'my_unknown_task'
cause You attempted to enqueue a job with a task name (e.g., 'my_unknown_task') that has not been registered with `pg.worker.register_task()`.fixBefore enqueuing a job, ensure that the task function corresponding to its name has been registered using `pg.worker.register_task(your_async_function)`. -
TypeError: Object of type <YourObject> is not JSON serializable
cause The payload passed to `pg.enqueue()` contains an object that cannot be serialized to JSON (e.g., a custom class instance, a set, or other non-JSON compatible types).fixEnsure that the `payload` argument for `pg.enqueue()` is a dictionary containing only JSON-serializable types (strings, numbers, booleans, lists, and other dictionaries).
Warnings
- breaking Starting from v0.26.3, `PgQueuer.run()` no longer silently swallows crashes from its internal `QueueManager` or `SchedulerManager`. Instead, exceptions from these managers will now propagate immediately to the caller, preventing silent hangs or exits.
- gotcha All task functions registered with `pg.worker.register_task()` must be `async def` functions. Passing a synchronous function will lead to runtime errors when the worker attempts to execute it.
- gotcha Incorrect PostgreSQL connection details (host, port, user, password, database name) will prevent `PgQueuer` from connecting, leading to `asyncpg.exceptions.CannotConnectNowError` or `InvalidCatalogNameError`.
- gotcha Prior to v0.25.1, `PgQueuer` could, in certain crash scenarios, leave jobs in a 'picked' state, making them stuck and not retried. This was addressed by improved retry dequeue logic.
- gotcha Scheduler race conditions related to how `PgQueuer` handles concurrent scheduling of tasks were fixed in v0.26.1.
Install
-
pip install pgqueuer
Imports
- PgQueuer
from pgqueuer import PgQueuer
- JobResult
from pgqueuer.models import JobResult
- InMemoryDriver
from pgqueuer import InMemoryDriver
Quickstart
import asyncio
import os
from pgqueuer import PgQueuer
from pgqueuer.models import JobResult
# DATABASE_URL should point to your PostgreSQL instance.
# Example: "postgresql://user:password@localhost:5432/mydb"
# For local testing, ensure a PostgreSQL instance is running.
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql://postgres:postgres@localhost:5432/pgqueuer_test_db" # Default for local testing
)
async def my_task(job_id: str, payload: dict) -> str:
"""An asynchronous task that processes data."""
print(f"[TASK] Processing job {job_id} with payload: {payload}")
await asyncio.sleep(0.5) # Simulate async I/O or computation
result = f"Task {job_id} processed: {payload['message']}"
print(f"[TASK] {result}")
return result
async def main():
print(f"[MAIN] Connecting to PostgreSQL at: {DATABASE_URL}")
# Initialize PgQueuer using your PostgreSQL database URL
# This requires a running PostgreSQL database accessible at DATABASE_URL.
pg = await PgQueuer.from_asyncpg_url(DATABASE_URL)
# Ensure necessary database tables are set up
print("[MAIN] Running database migrations...")
await pg.run_migrations()
# Register your task function with the worker
pg.worker.register_task(my_task)
print("[MAIN] Task 'my_task' registered.")
# Enqueue a job with a specific task name and payload
payload = {"message": "Hello from PgQueuer!"}
job_id = await pg.enqueue("my_task", payload)
print(f"[MAIN] Enqueued job with ID: {job_id} for task 'my_task'.")
# Run the PgQueuer event loop to process jobs.
# For a quickstart, we run it once and process existing jobs for a short duration.
# In a production setup, this would typically run indefinitely.
print("[MAIN] Running PgQueuer to process enqueued jobs (running once for 10 seconds)...")
await pg.run(once=True, timeout=10) # Process all currently enqueued jobs and then exit
# Fetch and display the result of the enqueued job
print(f"[MAIN] Fetching result for job {job_id}...")
job_result: JobResult = await pg.fetch_job_result(job_id)
if job_result:
print(f"[MAIN] Job {job_id} status: {job_result.status}")
if job_result.status == "completed":
print(f"[MAIN] Job {job_id} result: {job_result.result}")
elif job_result.status == "failed":
print(f"[MAIN] Job {job_id} failed with error: {job_result.error_message}")
else:
print(f"[MAIN] Job {job_id} result not found or not yet processed.")
# Clean up resources (close database connections)
await pg.close()
print("[MAIN] PgQueuer resources closed.")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"\n[ERROR] An error occurred: {e}")
print("Please ensure your PostgreSQL database is running and accessible at the DATABASE_URL provided.")