Faktory Worker for Node.js
Faktory-worker is a robust Node.js client and worker library designed to integrate with the Faktory job server, enabling asynchronous background job processing within Node.js applications. It provides both a client for pushing jobs to the Faktory server and a worker for fetching and executing those jobs. The current stable version is 4.7.1, with an active development cadence indicated by recent minor releases introducing features like weighted-random queue fetching (v4.4.0) and CLI improvements (v4.5.0). Its key differentiators include comprehensive handling of Faktory job payloads, support for bulk job pushing, flexible queue configuration (including strictly ordered and weighted random processing), and graceful shutdown mechanisms for workers. The library requires Node.js >=16 and is compatible with Faktory server versions greater than 1.6.1. It also ships with TypeScript types, facilitating its use in modern TypeScript projects.
Common errors
-
faktory worker failed to start: Error: connect ECONNREFUSED 127.0.0.1:7419
cause The Faktory job server is not running or is not accessible at the default host and port (localhost:7419), or `FAKTORY_URL` environment variable is misconfigured.fixEnsure the Faktory server is running and accessible from the worker process. Verify `FAKTORY_URL` environment variable, or explicitly pass connection options to `faktory.connect()` or `faktory.work()`. -
Error: Job not found <jid>
cause This error can occur intermittently, especially with large job backlogs or during worker deployments, indicating the Faktory server cannot locate a job that a worker attempted to process.fixReview Faktory server logs for related errors. Ensure consistent deployment practices to avoid sudden worker restarts on jobs. Consider Faktory server configuration and network stability. This might be a server-side issue or transient network problem. -
TypeError: faktory.connect is not a function (when using require)
cause Attempting to use ES module named imports syntax with CommonJS `require()` or `faktory-worker` is exporting functions directly and not as properties of a default object when using CommonJS.fixFor CommonJS, use `const faktory = require('faktory-worker');` and then access `faktory.connect()`, `faktory.register()`, `faktory.work()`. For ES modules, use `import { connect, register, work } from 'faktory-worker';`.
Warnings
- gotcha It is crucial to properly manage Faktory client connections. Clients should be reused across multiple job pushes rather than creating a new client for each job. Remember to call `client.close()` when the client is no longer needed to release resources, especially in short-lived scripts or server shutdowns.
- gotcha Job functions registered with `faktory.register()` must correctly handle asynchronous operations. If an `async` job function returns before all `await` calls are resolved, the job will be `ACK`ed prematurely by the Faktory server, potentially leading to incomplete work.
- breaking The library requires Node.js version 16 or higher. Older Node.js versions are not supported and will result in runtime errors or unexpected behavior.
- breaking This `faktory-worker` library is compatible with Faktory server versions `>v1.6.1`. Using it with older Faktory server versions may lead to protocol incompatibilities, unexpected job processing failures, or connection issues.
- gotcha If a Faktory worker process crashes while processing a job, the job will sit in the 'busy' state until its reservation timeout expires on the Faktory server. Only after the timeout will Faktory consider it failed and potentially re-enqueue it for retry, depending on job configuration. This can cause delays.
Install
-
npm install faktory-worker -
yarn add faktory-worker -
pnpm add faktory-worker
Imports
- connect
const faktory = require('faktory-worker'); const client = await faktory.connect();import { connect } from 'faktory-worker'; - register
const faktory = require('faktory-worker'); faktory.register('JobType', ...);import { register } from 'faktory-worker'; - work
const faktory = require('faktory-worker'); faktory.work();import { work } from 'faktory-worker'; - Client
import { Client } from 'faktory-worker';
Quickstart
import { connect, register, work, ClientOptions } from 'faktory-worker';
// 1. Define your job processing logic
interface ResizeImagePayload {
id: number;
size: string;
}
register('ResizeImage', async (payload: ResizeImagePayload) => {
console.log(`[Worker] Processing job ResizeImage for image ${payload.id} with size ${payload.size}`);
// Simulate an asynchronous operation, e.g., image resizing or database update
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
console.log(`[Worker] Finished ResizeImage for image ${payload.id}`);
});
// 2. Worker startup (typically run in a long-running background process)
async function startFaktoryWorker() {
console.log('Starting Faktory worker...');
try {
const workerOptions: ClientOptions = {
queues: ['default', 'images'], // Listen to specified queues
concurrency: 5, // Process up to 5 jobs concurrently
// Faktory server URL can be set via FAKTORY_URL environment variable
// or explicitly passed: { host: 'localhost', port: 7419 }
// factory: { host: 'localhost', port: 7419 },
};
await work(workerOptions);
console.log('Faktory worker started and waiting for jobs...');
} catch (error) {
console.error(`Faktory worker failed to start: ${error}`);
// In Node.js, ensure the process exits on critical errors
if (typeof process !== 'undefined' && process.exit) {
process.exit(1);
}
}
}
// 3. Client job pushing (typically run from an application server or another process)
async function pushFaktoryJob(payload: ResizeImagePayload) {
let client;
try {
console.log(`[Client] Connecting to Faktory server to push job for image ${payload.id}...`);
client = await connect(); // Connects to Faktory server
console.log(`[Client] Pushing job 'ResizeImage' with payload:`, payload);
await client.job('ResizeImage', payload).push();
console.log(`[Client] Job 'ResizeImage' for image ${payload.id} pushed successfully.`);
// Example of pushing a bulk of jobs
// const job1 = client.job('ResizeImage', { id: 102, size: 'medium' });
// const job2 = client.job('ResizeImage', { id: 103, size: 'small' });
// const rejected = await client.pushBulk([job1, job2]);
// if (Object.keys(rejected).length > 0) {
// console.error('[Client] Some bulk jobs were rejected:', rejected);
// }
} catch (error) {
console.error(`[Client] Failed to push job: ${error}`);
} finally {
if (client) {
await client.close(); // Important: reuse client or close after use
console.log('[Client] Disconnected from Faktory server.');
}
}
}
// To run this quickstart:
// 1. Ensure a Faktory server is running (e.g., docker run --rm -p 7419:7419 -p 7420:7420 contribsys/faktory)
// 2. Save this file (e.g., `app.ts`).
// 3. Run the worker in one terminal: `ts-node app.ts worker`
// 4. Run the client to push a job in another terminal: `ts-node app.ts client`
const mode = process.argv[2];
if (mode === 'worker') {
startFaktoryWorker();
} else if (mode === 'client') {
pushFaktoryJob({ id: 101, size: 'large' });
} else {
console.log('Usage: ts-node app.ts [worker|client]');
console.log('Example: ts-node app.ts worker (to start the job processor)');
console.log('Example: ts-node app.ts client (to push a job)');
// For a truly single-file runnable demo, you'd push then immediately start a worker
// but in practice, these are separate long-running processes.
}