Faktory Python Worker
Faktory is a polyglot background job system. The `faktory` Python library provides a complete client and worker implementation, enabling Python applications to push jobs to, and process jobs from, a Faktory server. It supports features like concurrency, retries, custom metadata, scheduled jobs, and graceful shutdown. Version 1.0.0 was recently released, focusing on improved connection reliability and robustness.
Common errors
-
It simply hangs at "Connecting to Faktory..."
cause The Faktory server is not running, or the `FAKTORY_URL` is incorrect or unreachable.fixVerify the Faktory server is running (`faktory` command). Check the `FAKTORY_URL` environment variable or the `faktory_url` parameter in `faktory.connection()` or `faktory.Worker` constructor to ensure it points to the correct host, port, and includes authentication if required. For example, `export FAKTORY_URL=tcp://localhost:7419`. -
TypeError: 'module' object is not callable (when trying to import `Client` or `Worker` directly)
cause Attempting to instantiate `faktory.Client` or `faktory.Worker` directly without the correct import or constructor, or confusing it with a different library's API.fixFor the client, use `import faktory` and then `with faktory.connection() as client:`. For the worker, use `from faktory import Worker` and then `worker = Worker(...)`. -
faktory.exceptions.ConnectionError: AUTH required
cause The Faktory server is configured to require a password, but the client/worker is not providing one.fixProvide the password in the `FAKTORY_URL` string, e.g., `FAKTORY_URL=tcp://:your_password@localhost:7419`.
Warnings
- gotcha Faktory server requires a password in production environments by default since version 0.7.0. The Python client/worker will fail to connect without proper authentication.
- gotcha The default concurrency mode for workers uses `ProcessPoolExecutor`. Be aware of the overhead of multiprocessing when choosing concurrency levels, and consider the implications for global variables if `use_threads=True` is employed.
- breaking The 1.0.0 release of `faktory` introduces fixes for connection reliability and process pool recovery. While not explicit API breaking changes, code relying on previous, potentially unstable, connection behaviors might experience different outcomes or errors that were previously masked.
Install
-
pip install faktory
Imports
- Worker
from faktory import Worker
- Client connection
from faktory import Client
import faktory with faktory.connection() as client:
Quickstart
import faktory
import time
import logging
import os
logging.basicConfig(level=logging.INFO)
# --- Worker (Consumer) Example ---
def add_numbers(x, y):
logging.info(f"Processing job: {x} + {y} = {x + y}")
return x + y
def run_worker():
logging.info("Starting Faktory Worker...")
worker = faktory.Worker(
queues=['default'],
concurrency=1, # Adjust for more parallelism
faktory_url=os.environ.get('FAKTORY_URL', 'tcp://localhost:7419')
)
worker.register('add_numbers', add_numbers)
try:
worker.run()
except KeyboardInterrupt:
logging.info("Worker shutting down...")
# --- Client (Producer) Example ---
def push_jobs():
faktory_url = os.environ.get('FAKTORY_URL', 'tcp://localhost:7419')
logging.info(f"Connecting to Faktory server at {faktory_url}...")
try:
with faktory.connection(faktory_url=faktory_url) as client:
for i in range(5):
x = i * 10
y = i + 5
client.queue('add_numbers', args=(x, y))
logging.info(f"Enqueued job: add_numbers({x}, {y})")
time.sleep(0.5)
logging.info("All jobs enqueued.")
except Exception as e:
logging.error(f"Failed to connect or enqueue jobs: {e}")
if __name__ == '__main__':
# To run:
# 1. Start Faktory server (e.g., `faktory` in terminal or Docker)
# 2. Run the worker: `python your_script.py worker`
# 3. Run the client: `python your_script.py client`
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'worker':
run_worker()
elif len(sys.argv) > 1 and sys.argv[1] == 'client':
push_jobs()
else:
print("Usage: python your_script.py [worker|client]")