Database Table Change Data Capture Streamer
af-streams is a Node.js library designed to generate data streams directly from database tables, acting as a lightweight change data capture (CDC) mechanism. It currently supports MSSQL and PostgreSQL databases. The library operates by monitoring a specified timestamp field and a set of primary key fields within a table to detect and stream new or updated records. It provides flexible output options, including sending data to the console, a callback function, an event emitter, a TCP JSON sender (in WSO2 event binary format), or a WebSocket JSON sender. Additionally, it offers various windowing functionalities like `SingleEventTimeWindow`, `TimeWindow`, `KeyedNumberWindow`, `KeyedSingleEventTimeWindow`, and `KeyedTimeWindow` for advanced stream processing. The current stable version is 3.1.60, and it targets Node.js environments version 14 and higher. While a specific release cadence isn't published, the version number indicates active development.
Common errors
-
TypeError: Invalid Version: 12.x.x
cause Attempting to run `af-streams` on an incompatible Node.js version, typically older than v14.fixUpdate your Node.js environment to version 14 or higher. Use a Node Version Manager (nvm) for easy switching: `nvm install 18 && nvm use 18`. -
Error: Missing `timestampField` in stream configuration.
cause The `timestampField` property, crucial for change detection, was not provided or was null in the `DatabaseStream` constructor.fixAdd `timestampField: 'YourTimestampColumnName'` to your `DatabaseStream` configuration, pointing to a valid timestamp column in your database table. -
Error: No primary key fields defined. Provide 'primaryKeyFields' in the configuration.
cause The `primaryKeyFields` array, required for uniquely identifying records, was empty or missing from the `DatabaseStream` configuration.fixSpecify `primaryKeyFields: ['Column1', 'Column2']` (or a single string for a single-column primary key) in your `DatabaseStream` configuration. -
Cannot find module 'mssql' (or 'pg')
cause The required database driver for the specified `dbType` was not installed as a peer dependency.fixInstall the correct database driver for your setup: `npm install mssql` for MSSQL, or `npm install pg` for PostgreSQL.
Warnings
- breaking The package requires Node.js version 14 or higher. Running it on older Node.js versions will result in runtime errors due to unsupported syntax or APIs.
- gotcha Database tables streamed by af-streams must have a timestamp field and a defined set of primary key fields. Incorrect or missing configuration for these fields will prevent the stream from identifying new or updated records correctly.
- breaking Major version 3 introduced potential breaking changes from previous versions (v1 or v2). While specific changes aren't detailed in the provided excerpt, typically new major versions involve API changes or modified behavior that requires adaptation.
- gotcha Database drivers (e.g., `mssql` or `pg`) are not direct dependencies of `af-streams` itself. You must explicitly install the appropriate driver for your chosen database type (`dbType`) for the library to function.
Install
-
npm install af-streams -
yarn add af-streams -
pnpm add af-streams
Imports
- DatabaseStream
const { DatabaseStream } = require('af-streams');import { DatabaseStream } from 'af-streams'; - TimeWindow
import TimeWindow from 'af-streams';
import { TimeWindow } from 'af-streams'; - ConsoleSender
const ConsoleSender = require('af-streams').ConsoleSender;import { ConsoleSender } from 'af-streams';
Quickstart
import { DatabaseStream, ConsoleSender } from 'af-streams';
import { type ConnectionOptions as MSSQLConnectionOptions } from 'mssql';
const mssqlConfig: MSSQLConnectionOptions = {
user: process.env.DB_USER ?? 'sa',
password: process.env.DB_PASSWORD ?? 'YourStrong@Password',
server: process.env.DB_HOST ?? 'localhost',
database: process.env.DB_NAME ?? 'MyDatabase',
options: {
encrypt: process.env.DB_ENCRYPT === 'true', // Use for Azure SQL Database, default: false
trustServerCertificate: process.env.DB_TRUST_SERVER_CERTIFICATE === 'true' // Change to true for local dev / self-signed certs
}
};
async function startDatabaseStream() {
try {
const stream = new DatabaseStream({
dbType: 'mssql', // or 'postgres'
dbConfig: mssqlConfig,
table: 'YourTableName',
timestampField: 'UpdatedAt',
primaryKeyFields: ['Id', 'TenantId'], // Composite primary key
pollIntervalMs: 5000, // Poll every 5 seconds
// Additional options like initialTimestamp, batchSize, etc.
});
// Example: Using the ConsoleSender to output streamed data
stream.addSender(new ConsoleSender());
// Example: Using a callback sender for custom processing
stream.addSender({
send: (data) => {
console.log('Callback Sender received record:', data);
// Here you could process the data, send to a message queue, etc.
}
});
console.log('Starting database stream...');
await stream.start();
// To stop the stream after some time or on a specific event
// setTimeout(() => {
// stream.stop();
// console.log('Database stream stopped.');
// }, 60000); // Stop after 1 minute
} catch (error) {
console.error('Failed to start database stream:', error);
process.exit(1);
}
}
startDatabaseStream();