Database Table Change Data Capture Streamer

raw JSON →
3.1.60 verified Wed Apr 22 auth: no javascript

af-streams is a Node.js library designed to generate data streams directly from database tables, acting as a lightweight change data capture (CDC) mechanism. It currently supports MSSQL and PostgreSQL databases. The library operates by monitoring a specified timestamp field and a set of primary key fields within a table to detect and stream new or updated records. It provides flexible output options, including sending data to the console, a callback function, an event emitter, a TCP JSON sender (in WSO2 event binary format), or a WebSocket JSON sender. Additionally, it offers various windowing functionalities like `SingleEventTimeWindow`, `TimeWindow`, `KeyedNumberWindow`, `KeyedSingleEventTimeWindow`, and `KeyedTimeWindow` for advanced stream processing. The current stable version is 3.1.60, and it targets Node.js environments version 14 and higher. While a specific release cadence isn't published, the version number indicates active development.

error TypeError: Invalid Version: 12.x.x
cause Attempting to run `af-streams` on an incompatible Node.js version, typically older than v14.
fix
Update your Node.js environment to version 14 or higher. Use a Node Version Manager (nvm) for easy switching: nvm install 18 && nvm use 18.
error Error: Missing `timestampField` in stream configuration.
cause The `timestampField` property, crucial for change detection, was not provided or was null in the `DatabaseStream` constructor.
fix
Add timestampField: 'YourTimestampColumnName' to your DatabaseStream configuration, pointing to a valid timestamp column in your database table.
error Error: No primary key fields defined. Provide 'primaryKeyFields' in the configuration.
cause The `primaryKeyFields` array, required for uniquely identifying records, was empty or missing from the `DatabaseStream` configuration.
fix
Specify primaryKeyFields: ['Column1', 'Column2'] (or a single string for a single-column primary key) in your DatabaseStream configuration.
error Cannot find module 'mssql' (or 'pg')
cause The required database driver for the specified `dbType` was not installed as a peer dependency.
fix
Install the correct database driver for your setup: npm install mssql for MSSQL, or npm install pg for PostgreSQL.
breaking The package requires Node.js version 14 or higher. Running it on older Node.js versions will result in runtime errors due to unsupported syntax or APIs.
fix Upgrade your Node.js environment to version 14 or newer. Consult your project's `engines` field in `package.json` for specific requirements.
gotcha Database tables streamed by af-streams must have a timestamp field and a defined set of primary key fields. Incorrect or missing configuration for these fields will prevent the stream from identifying new or updated records correctly.
fix Ensure your database table schema includes a timestamp field (e.g., `UpdatedAt`, `CreatedAt`) and that `timestampField` and `primaryKeyFields` are correctly specified in your `DatabaseStream` configuration.
breaking Major version 3 introduced potential breaking changes from previous versions (v1 or v2). While specific changes aren't detailed in the provided excerpt, typically new major versions involve API changes or modified behavior that requires adaptation.
fix Review the official changelog or migration guide for `af-streams` v3 on its GitHub repository to understand specific breaking changes and update your code accordingly.
gotcha Database drivers (e.g., `mssql` or `pg`) are not direct dependencies of `af-streams` itself. You must explicitly install the appropriate driver for your chosen database type (`dbType`) for the library to function.
fix Install the necessary database driver package, e.g., `npm install mssql` for MSSQL or `npm install pg` for PostgreSQL, in your project.
npm install af-streams
yarn add af-streams
pnpm add af-streams

This quickstart demonstrates how to set up and start a data stream from an MSSQL database table using `af-streams`, configured to output to both the console and a custom callback function. It highlights essential configuration for database connection, table specifics, and stream polling, using environment variables for sensitive credentials.

import { DatabaseStream, ConsoleSender } from 'af-streams';
import { type ConnectionOptions as MSSQLConnectionOptions } from 'mssql';

const mssqlConfig: MSSQLConnectionOptions = {
  user: process.env.DB_USER ?? 'sa',
  password: process.env.DB_PASSWORD ?? 'YourStrong@Password',
  server: process.env.DB_HOST ?? 'localhost', 
  database: process.env.DB_NAME ?? 'MyDatabase',
  options: {
    encrypt: process.env.DB_ENCRYPT === 'true', // Use for Azure SQL Database, default: false
    trustServerCertificate: process.env.DB_TRUST_SERVER_CERTIFICATE === 'true' // Change to true for local dev / self-signed certs
  }
};

async function startDatabaseStream() {
  try {
    const stream = new DatabaseStream({
      dbType: 'mssql', // or 'postgres'
      dbConfig: mssqlConfig,
      table: 'YourTableName',
      timestampField: 'UpdatedAt',
      primaryKeyFields: ['Id', 'TenantId'], // Composite primary key
      pollIntervalMs: 5000, // Poll every 5 seconds
      // Additional options like initialTimestamp, batchSize, etc.
    });

    // Example: Using the ConsoleSender to output streamed data
    stream.addSender(new ConsoleSender());

    // Example: Using a callback sender for custom processing
    stream.addSender({
      send: (data) => {
        console.log('Callback Sender received record:', data);
        // Here you could process the data, send to a message queue, etc.
      }
    });

    console.log('Starting database stream...');
    await stream.start();

    // To stop the stream after some time or on a specific event
    // setTimeout(() => {
    //   stream.stop();
    //   console.log('Database stream stopped.');
    // }, 60000); // Stop after 1 minute

  } catch (error) {
    console.error('Failed to start database stream:', error);
    process.exit(1);
  }
}

startDatabaseStream();