Database Table Change Data Capture Streamer
raw JSON →af-streams is a Node.js library designed to generate data streams directly from database tables, acting as a lightweight change data capture (CDC) mechanism. It currently supports MSSQL and PostgreSQL databases. The library operates by monitoring a specified timestamp field and a set of primary key fields within a table to detect and stream new or updated records. It provides flexible output options, including sending data to the console, a callback function, an event emitter, a TCP JSON sender (in WSO2 event binary format), or a WebSocket JSON sender. Additionally, it offers various windowing functionalities like `SingleEventTimeWindow`, `TimeWindow`, `KeyedNumberWindow`, `KeyedSingleEventTimeWindow`, and `KeyedTimeWindow` for advanced stream processing. The current stable version is 3.1.60, and it targets Node.js environments version 14 and higher. While a specific release cadence isn't published, the version number indicates active development.
Common errors
error TypeError: Invalid Version: 12.x.x ↓
nvm install 18 && nvm use 18. error Error: Missing `timestampField` in stream configuration. ↓
timestampField: 'YourTimestampColumnName' to your DatabaseStream configuration, pointing to a valid timestamp column in your database table. error Error: No primary key fields defined. Provide 'primaryKeyFields' in the configuration. ↓
primaryKeyFields: ['Column1', 'Column2'] (or a single string for a single-column primary key) in your DatabaseStream configuration. error Cannot find module 'mssql' (or 'pg') ↓
npm install mssql for MSSQL, or npm install pg for PostgreSQL. Warnings
breaking The package requires Node.js version 14 or higher. Running it on older Node.js versions will result in runtime errors due to unsupported syntax or APIs. ↓
gotcha Database tables streamed by af-streams must have a timestamp field and a defined set of primary key fields. Incorrect or missing configuration for these fields will prevent the stream from identifying new or updated records correctly. ↓
breaking Major version 3 introduced potential breaking changes from previous versions (v1 or v2). While specific changes aren't detailed in the provided excerpt, typically new major versions involve API changes or modified behavior that requires adaptation. ↓
gotcha Database drivers (e.g., `mssql` or `pg`) are not direct dependencies of `af-streams` itself. You must explicitly install the appropriate driver for your chosen database type (`dbType`) for the library to function. ↓
Install
npm install af-streams yarn add af-streams pnpm add af-streams Imports
- DatabaseStream wrong
const { DatabaseStream } = require('af-streams');correctimport { DatabaseStream } from 'af-streams'; - TimeWindow wrong
import TimeWindow from 'af-streams';correctimport { TimeWindow } from 'af-streams'; - ConsoleSender wrong
const ConsoleSender = require('af-streams').ConsoleSender;correctimport { ConsoleSender } from 'af-streams';
Quickstart
import { DatabaseStream, ConsoleSender } from 'af-streams';
import { type ConnectionOptions as MSSQLConnectionOptions } from 'mssql';
const mssqlConfig: MSSQLConnectionOptions = {
user: process.env.DB_USER ?? 'sa',
password: process.env.DB_PASSWORD ?? 'YourStrong@Password',
server: process.env.DB_HOST ?? 'localhost',
database: process.env.DB_NAME ?? 'MyDatabase',
options: {
encrypt: process.env.DB_ENCRYPT === 'true', // Use for Azure SQL Database, default: false
trustServerCertificate: process.env.DB_TRUST_SERVER_CERTIFICATE === 'true' // Change to true for local dev / self-signed certs
}
};
async function startDatabaseStream() {
try {
const stream = new DatabaseStream({
dbType: 'mssql', // or 'postgres'
dbConfig: mssqlConfig,
table: 'YourTableName',
timestampField: 'UpdatedAt',
primaryKeyFields: ['Id', 'TenantId'], // Composite primary key
pollIntervalMs: 5000, // Poll every 5 seconds
// Additional options like initialTimestamp, batchSize, etc.
});
// Example: Using the ConsoleSender to output streamed data
stream.addSender(new ConsoleSender());
// Example: Using a callback sender for custom processing
stream.addSender({
send: (data) => {
console.log('Callback Sender received record:', data);
// Here you could process the data, send to a message queue, etc.
}
});
console.log('Starting database stream...');
await stream.start();
// To stop the stream after some time or on a specific event
// setTimeout(() => {
// stream.stop();
// console.log('Database stream stopped.');
// }, 60000); // Stop after 1 minute
} catch (error) {
console.error('Failed to start database stream:', error);
process.exit(1);
}
}
startDatabaseStream();