Faktory Python Worker

1.0.0 · active · verified Thu Apr 16

Faktory is a polyglot background job system. The `faktory` Python library provides a complete client and worker implementation, enabling Python applications to push jobs to, and process jobs from, a Faktory server. It supports features like concurrency, retries, custom metadata, scheduled jobs, and graceful shutdown. Version 1.0.0 was recently released, focusing on improved connection reliability and robustness.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates both a Faktory client (producer) that enqueues jobs and a Faktory worker (consumer) that processes them. Ensure the Faktory server is running and the `FAKTORY_URL` environment variable is set if your server isn't on `tcp://localhost:7419`. Run the worker in one terminal and the client in another.

import faktory
import time
import logging
import os

logging.basicConfig(level=logging.INFO)

# --- Worker (Consumer) Example ---
def add_numbers(x, y):
    logging.info(f"Processing job: {x} + {y} = {x + y}")
    return x + y

def run_worker():
    logging.info("Starting Faktory Worker...")
    worker = faktory.Worker(
        queues=['default'],
        concurrency=1, # Adjust for more parallelism
        faktory_url=os.environ.get('FAKTORY_URL', 'tcp://localhost:7419')
    )
    worker.register('add_numbers', add_numbers)
    try:
        worker.run()
    except KeyboardInterrupt:
        logging.info("Worker shutting down...")

# --- Client (Producer) Example ---
def push_jobs():
    faktory_url = os.environ.get('FAKTORY_URL', 'tcp://localhost:7419')
    logging.info(f"Connecting to Faktory server at {faktory_url}...")
    try:
        with faktory.connection(faktory_url=faktory_url) as client:
            for i in range(5):
                x = i * 10
                y = i + 5
                client.queue('add_numbers', args=(x, y))
                logging.info(f"Enqueued job: add_numbers({x}, {y})")
                time.sleep(0.5)
        logging.info("All jobs enqueued.")
    except Exception as e:
        logging.error(f"Failed to connect or enqueue jobs: {e}")

if __name__ == '__main__':
    # To run:
    # 1. Start Faktory server (e.g., `faktory` in terminal or Docker)
    # 2. Run the worker: `python your_script.py worker`
    # 3. Run the client: `python your_script.py client`
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == 'worker':
        run_worker()
    elif len(sys.argv) > 1 and sys.argv[1] == 'client':
        push_jobs()
    else:
        print("Usage: python your_script.py [worker|client]")

view raw JSON →