{"id":10031,"library":"pgqueuer","title":"Pgqueuer: PostgreSQL-backed Job Queuing and Scheduling","description":"Pgqueuer is a Python library that utilizes PostgreSQL as a robust and efficient backend for asynchronous job queuing and scheduling. It supports both one-off jobs and recurring tasks via cron-like schedules, offering an in-memory driver for development and testing. The library is actively developed, with frequent releases bringing architectural improvements, new features, and bug fixes. The current version is 0.26.3.","status":"active","version":"0.26.3","language":"en","source_language":"en","source_url":"https://github.com/janbjorge/pgqueuer/","tags":["job queue","async","postgresql","background tasks","scheduler","asyncio"],"install":[{"cmd":"pip install pgqueuer","lang":"bash","label":"Install pgqueuer"}],"dependencies":[{"reason":"Default asynchronous PostgreSQL driver for production use.","package":"asyncpg","optional":false}],"imports":[{"symbol":"PgQueuer","correct":"from pgqueuer import PgQueuer"},{"symbol":"JobResult","correct":"from pgqueuer.models import JobResult"},{"note":"Use this for local development/testing without a real PostgreSQL database.","symbol":"InMemoryDriver","correct":"from pgqueuer import InMemoryDriver"}],"quickstart":{"code":"import asyncio\nimport os\nfrom pgqueuer import PgQueuer\nfrom pgqueuer.models import JobResult\n\n# DATABASE_URL should point to your PostgreSQL instance.\n# Example: \"postgresql://user:password@localhost:5432/mydb\"\n# For local testing, ensure a PostgreSQL instance is running.\nDATABASE_URL = os.environ.get(\n    \"DATABASE_URL\",\n    \"postgresql://postgres:postgres@localhost:5432/pgqueuer_test_db\" # Default for local testing\n)\n\nasync def my_task(job_id: str, payload: dict) -> str:\n    \"\"\"An asynchronous task that processes data.\"\"\"\n    print(f\"[TASK] Processing job {job_id} with payload: {payload}\")\n    await asyncio.sleep(0.5) # Simulate async I/O or computation\n    result = f\"Task {job_id} processed: {payload['message']}\"\n    print(f\"[TASK] {result}\")\n    return result\n\nasync def main():\n    print(f\"[MAIN] Connecting to PostgreSQL at: {DATABASE_URL}\")\n    # Initialize PgQueuer using your PostgreSQL database URL\n    # This requires a running PostgreSQL database accessible at DATABASE_URL.\n    pg = await PgQueuer.from_asyncpg_url(DATABASE_URL)\n\n    # Ensure necessary database tables are set up\n    print(\"[MAIN] Running database migrations...\")\n    await pg.run_migrations()\n\n    # Register your task function with the worker\n    pg.worker.register_task(my_task)\n    print(\"[MAIN] Task 'my_task' registered.\")\n\n    # Enqueue a job with a specific task name and payload\n    payload = {\"message\": \"Hello from PgQueuer!\"}\n    job_id = await pg.enqueue(\"my_task\", payload)\n    print(f\"[MAIN] Enqueued job with ID: {job_id} for task 'my_task'.\")\n\n    # Run the PgQueuer event loop to process jobs.\n    # For a quickstart, we run it once and process existing jobs for a short duration.\n    # In a production setup, this would typically run indefinitely.\n    print(\"[MAIN] Running PgQueuer to process enqueued jobs (running once for 10 seconds)...\")\n    await pg.run(once=True, timeout=10) # Process all currently enqueued jobs and then exit\n\n    # Fetch and display the result of the enqueued job\n    print(f\"[MAIN] Fetching result for job {job_id}...\")\n    job_result: JobResult = await pg.fetch_job_result(job_id)\n\n    if job_result:\n        print(f\"[MAIN] Job {job_id} status: {job_result.status}\")\n        if job_result.status == \"completed\":\n            print(f\"[MAIN] Job {job_id} result: {job_result.result}\")\n        elif job_result.status == \"failed\":\n            print(f\"[MAIN] Job {job_id} failed with error: {job_result.error_message}\")\n    else:\n        print(f\"[MAIN] Job {job_id} result not found or not yet processed.\")\n\n    # Clean up resources (close database connections)\n    await pg.close()\n    print(\"[MAIN] PgQueuer resources closed.\")\n\nif __name__ == \"__main__\":\n    try:\n        asyncio.run(main())\n    except Exception as e:\n        print(f\"\\n[ERROR] An error occurred: {e}\")\n        print(\"Please ensure your PostgreSQL database is running and accessible at the DATABASE_URL provided.\")","lang":"python","description":"This quickstart demonstrates how to set up `PgQueuer` with a PostgreSQL database, register an asynchronous task, enqueue a job, and run the queue to process it. It uses `os.environ.get` for the database URL, defaulting to a common local PostgreSQL setup. Ensure a PostgreSQL server is running and accessible at the specified `DATABASE_URL`."},"warnings":[{"fix":"Update your error handling logic around `PgQueuer.run()` to catch and manage potential exceptions. This change improves observability of critical failures.","message":"Starting from v0.26.3, `PgQueuer.run()` no longer silently swallows crashes from its internal `QueueManager` or `SchedulerManager`. Instead, exceptions from these managers will now propagate immediately to the caller, preventing silent hangs or exits.","severity":"breaking","affected_versions":">=0.26.3"},{"fix":"Ensure your task functions are defined using `async def` and `await` any asynchronous operations within them.","message":"All task functions registered with `pg.worker.register_task()` must be `async def` functions. Passing a synchronous function will lead to runtime errors when the worker attempts to execute it.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Verify your `DATABASE_URL` string or the parameters passed to `PgQueuer.from_asyncpg_url` match your PostgreSQL server's configuration and ensure the database exists.","message":"Incorrect PostgreSQL connection details (host, port, user, password, database name) will prevent `PgQueuer` from connecting, leading to `asyncpg.exceptions.CannotConnectNowError` or `InvalidCatalogNameError`.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Upgrade to `pgqueuer` v0.25.1 or newer to benefit from the improved job reclamation logic that automatically retries stale 'picked' jobs after worker crashes.","message":"Prior to v0.25.1, `PgQueuer` could, in certain crash scenarios, leave jobs in a 'picked' state, making them stuck and not retried. This was addressed by improved retry dequeue logic.","severity":"gotcha","affected_versions":"<0.25.1"},{"fix":"Upgrade to `pgqueuer` v0.26.1 or newer to ensure more robust and reliable scheduling, especially under heavy load or in distributed environments.","message":"Scheduler race conditions related to how `PgQueuer` handles concurrent scheduling of tasks were fixed in v0.26.1.","severity":"gotcha","affected_versions":"<0.26.1"}],"env_vars":null,"last_verified":"2026-04-17T00:00:00.000Z","next_check":"2026-07-16T00:00:00.000Z","problems":[{"fix":"Create the specified database on your PostgreSQL server or correct the database name in your `DATABASE_URL`.","cause":"The database name specified in the `DATABASE_URL` does not exist on the PostgreSQL server.","error":"asyncpg.exceptions.InvalidCatalogNameError: database \"non_existent_db\" does not exist"},{"fix":"Ensure your PostgreSQL server is running, listening on the correct host/port, and is accessible (e.g., firewall rules). Double-check the host and port in your `DATABASE_URL`.","cause":"Pgqueuer could not establish a connection to the PostgreSQL server. This typically means the server is not running, is inaccessible from where Pgqueuer is running, or the host/port in the `DATABASE_URL` is incorrect.","error":"asyncpg.exceptions.CannotConnectNowError: could not connect to server: Connection refused"},{"fix":"Change your task function definition from `def my_task(...)` to `async def my_task(...)` and ensure any internal asynchronous calls use `await`.","cause":"You registered a synchronous Python function with `pg.worker.register_task()` but Pgqueuer expects an `async def` function that can be awaited.","error":"TypeError: 'function' object is not awaitable"},{"fix":"Before enqueuing a job, ensure that the task function corresponding to its name has been registered using `pg.worker.register_task(your_async_function)`.","cause":"You attempted to enqueue a job with a task name (e.g., 'my_unknown_task') that has not been registered with `pg.worker.register_task()`.","error":"KeyError: 'my_unknown_task'"},{"fix":"Ensure that the `payload` argument for `pg.enqueue()` is a dictionary containing only JSON-serializable types (strings, numbers, booleans, lists, and other dictionaries).","cause":"The payload passed to `pg.enqueue()` contains an object that cannot be serialized to JSON (e.g., a custom class instance, a set, or other non-JSON compatible types).","error":"TypeError: Object of type <YourObject> is not JSON serializable"}]}