{"library":"pgqueuer","title":"Pgqueuer: PostgreSQL-backed Job Queuing and Scheduling","description":"Pgqueuer is a Python library that utilizes PostgreSQL as a robust and efficient backend for asynchronous job queuing and scheduling. It supports both one-off jobs and recurring tasks via cron-like schedules, offering an in-memory driver for development and testing. The library is actively developed, with frequent releases bringing architectural improvements, new features, and bug fixes. The current version is 0.26.3.","language":"python","status":"active","last_verified":"Fri Apr 17","install":{"commands":["pip install pgqueuer"],"cli":null},"imports":["from pgqueuer import PgQueuer","from pgqueuer.models import JobResult","from pgqueuer import InMemoryDriver"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"import asyncio\nimport os\nfrom pgqueuer import PgQueuer\nfrom pgqueuer.models import JobResult\n\n# DATABASE_URL should point to your PostgreSQL instance.\n# Example: \"postgresql://user:password@localhost:5432/mydb\"\n# For local testing, ensure a PostgreSQL instance is running.\nDATABASE_URL = os.environ.get(\n    \"DATABASE_URL\",\n    \"postgresql://postgres:postgres@localhost:5432/pgqueuer_test_db\" # Default for local testing\n)\n\nasync def my_task(job_id: str, payload: dict) -> str:\n    \"\"\"An asynchronous task that processes data.\"\"\"\n    print(f\"[TASK] Processing job {job_id} with payload: {payload}\")\n    await asyncio.sleep(0.5) # Simulate async I/O or computation\n    result = f\"Task {job_id} processed: {payload['message']}\"\n    print(f\"[TASK] {result}\")\n    return result\n\nasync def main():\n    print(f\"[MAIN] Connecting to PostgreSQL at: {DATABASE_URL}\")\n    # Initialize PgQueuer using your PostgreSQL database URL\n    # This requires a running PostgreSQL database accessible at DATABASE_URL.\n    pg = await PgQueuer.from_asyncpg_url(DATABASE_URL)\n\n    # Ensure necessary database tables are set up\n    print(\"[MAIN] Running database migrations...\")\n    await pg.run_migrations()\n\n    # Register your task function with the worker\n    pg.worker.register_task(my_task)\n    print(\"[MAIN] Task 'my_task' registered.\")\n\n    # Enqueue a job with a specific task name and payload\n    payload = {\"message\": \"Hello from PgQueuer!\"}\n    job_id = await pg.enqueue(\"my_task\", payload)\n    print(f\"[MAIN] Enqueued job with ID: {job_id} for task 'my_task'.\")\n\n    # Run the PgQueuer event loop to process jobs.\n    # For a quickstart, we run it once and process existing jobs for a short duration.\n    # In a production setup, this would typically run indefinitely.\n    print(\"[MAIN] Running PgQueuer to process enqueued jobs (running once for 10 seconds)...\")\n    await pg.run(once=True, timeout=10) # Process all currently enqueued jobs and then exit\n\n    # Fetch and display the result of the enqueued job\n    print(f\"[MAIN] Fetching result for job {job_id}...\")\n    job_result: JobResult = await pg.fetch_job_result(job_id)\n\n    if job_result:\n        print(f\"[MAIN] Job {job_id} status: {job_result.status}\")\n        if job_result.status == \"completed\":\n            print(f\"[MAIN] Job {job_id} result: {job_result.result}\")\n        elif job_result.status == \"failed\":\n            print(f\"[MAIN] Job {job_id} failed with error: {job_result.error_message}\")\n    else:\n        print(f\"[MAIN] Job {job_id} result not found or not yet processed.\")\n\n    # Clean up resources (close database connections)\n    await pg.close()\n    print(\"[MAIN] PgQueuer resources closed.\")\n\nif __name__ == \"__main__\":\n    try:\n        asyncio.run(main())\n    except Exception as e:\n        print(f\"\\n[ERROR] An error occurred: {e}\")\n        print(\"Please ensure your PostgreSQL database is running and accessible at the DATABASE_URL provided.\")","lang":"python","description":"This quickstart demonstrates how to set up `PgQueuer` with a PostgreSQL database, register an asynchronous task, enqueue a job, and run the queue to process it. It uses `os.environ.get` for the database URL, defaulting to a common local PostgreSQL setup. Ensure a PostgreSQL server is running and accessible at the specified `DATABASE_URL`.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}