Pgqueuer: PostgreSQL-backed Job Queuing and Scheduling

0.26.3 · active · verified Fri Apr 17

Pgqueuer is a Python library that utilizes PostgreSQL as a robust and efficient backend for asynchronous job queuing and scheduling. It supports both one-off jobs and recurring tasks via cron-like schedules, offering an in-memory driver for development and testing. The library is actively developed, with frequent releases bringing architectural improvements, new features, and bug fixes. The current version is 0.26.3.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up `PgQueuer` with a PostgreSQL database, register an asynchronous task, enqueue a job, and run the queue to process it. It uses `os.environ.get` for the database URL, defaulting to a common local PostgreSQL setup. Ensure a PostgreSQL server is running and accessible at the specified `DATABASE_URL`.

import asyncio
import os
from pgqueuer import PgQueuer
from pgqueuer.models import JobResult

# DATABASE_URL should point to your PostgreSQL instance.
# Example: "postgresql://user:password@localhost:5432/mydb"
# For local testing, ensure a PostgreSQL instance is running.
DATABASE_URL = os.environ.get(
    "DATABASE_URL",
    "postgresql://postgres:postgres@localhost:5432/pgqueuer_test_db" # Default for local testing
)

async def my_task(job_id: str, payload: dict) -> str:
    """An asynchronous task that processes data."""
    print(f"[TASK] Processing job {job_id} with payload: {payload}")
    await asyncio.sleep(0.5) # Simulate async I/O or computation
    result = f"Task {job_id} processed: {payload['message']}"
    print(f"[TASK] {result}")
    return result

async def main():
    print(f"[MAIN] Connecting to PostgreSQL at: {DATABASE_URL}")
    # Initialize PgQueuer using your PostgreSQL database URL
    # This requires a running PostgreSQL database accessible at DATABASE_URL.
    pg = await PgQueuer.from_asyncpg_url(DATABASE_URL)

    # Ensure necessary database tables are set up
    print("[MAIN] Running database migrations...")
    await pg.run_migrations()

    # Register your task function with the worker
    pg.worker.register_task(my_task)
    print("[MAIN] Task 'my_task' registered.")

    # Enqueue a job with a specific task name and payload
    payload = {"message": "Hello from PgQueuer!"}
    job_id = await pg.enqueue("my_task", payload)
    print(f"[MAIN] Enqueued job with ID: {job_id} for task 'my_task'.")

    # Run the PgQueuer event loop to process jobs.
    # For a quickstart, we run it once and process existing jobs for a short duration.
    # In a production setup, this would typically run indefinitely.
    print("[MAIN] Running PgQueuer to process enqueued jobs (running once for 10 seconds)...")
    await pg.run(once=True, timeout=10) # Process all currently enqueued jobs and then exit

    # Fetch and display the result of the enqueued job
    print(f"[MAIN] Fetching result for job {job_id}...")
    job_result: JobResult = await pg.fetch_job_result(job_id)

    if job_result:
        print(f"[MAIN] Job {job_id} status: {job_result.status}")
        if job_result.status == "completed":
            print(f"[MAIN] Job {job_id} result: {job_result.result}")
        elif job_result.status == "failed":
            print(f"[MAIN] Job {job_id} failed with error: {job_result.error_message}")
    else:
        print(f"[MAIN] Job {job_id} result not found or not yet processed.")

    # Clean up resources (close database connections)
    await pg.close()
    print("[MAIN] PgQueuer resources closed.")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"\n[ERROR] An error occurred: {e}")
        print("Please ensure your PostgreSQL database is running and accessible at the DATABASE_URL provided.")

view raw JSON →