Database Table Change Data Capture Streamer

3.1.60 · active · verified Wed Apr 22

af-streams is a Node.js library designed to generate data streams directly from database tables, acting as a lightweight change data capture (CDC) mechanism. It currently supports MSSQL and PostgreSQL databases. The library operates by monitoring a specified timestamp field and a set of primary key fields within a table to detect and stream new or updated records. It provides flexible output options, including sending data to the console, a callback function, an event emitter, a TCP JSON sender (in WSO2 event binary format), or a WebSocket JSON sender. Additionally, it offers various windowing functionalities like `SingleEventTimeWindow`, `TimeWindow`, `KeyedNumberWindow`, `KeyedSingleEventTimeWindow`, and `KeyedTimeWindow` for advanced stream processing. The current stable version is 3.1.60, and it targets Node.js environments version 14 and higher. While a specific release cadence isn't published, the version number indicates active development.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up and start a data stream from an MSSQL database table using `af-streams`, configured to output to both the console and a custom callback function. It highlights essential configuration for database connection, table specifics, and stream polling, using environment variables for sensitive credentials.

import { DatabaseStream, ConsoleSender } from 'af-streams';
import { type ConnectionOptions as MSSQLConnectionOptions } from 'mssql';

const mssqlConfig: MSSQLConnectionOptions = {
  user: process.env.DB_USER ?? 'sa',
  password: process.env.DB_PASSWORD ?? 'YourStrong@Password',
  server: process.env.DB_HOST ?? 'localhost', 
  database: process.env.DB_NAME ?? 'MyDatabase',
  options: {
    encrypt: process.env.DB_ENCRYPT === 'true', // Use for Azure SQL Database, default: false
    trustServerCertificate: process.env.DB_TRUST_SERVER_CERTIFICATE === 'true' // Change to true for local dev / self-signed certs
  }
};

async function startDatabaseStream() {
  try {
    const stream = new DatabaseStream({
      dbType: 'mssql', // or 'postgres'
      dbConfig: mssqlConfig,
      table: 'YourTableName',
      timestampField: 'UpdatedAt',
      primaryKeyFields: ['Id', 'TenantId'], // Composite primary key
      pollIntervalMs: 5000, // Poll every 5 seconds
      // Additional options like initialTimestamp, batchSize, etc.
    });

    // Example: Using the ConsoleSender to output streamed data
    stream.addSender(new ConsoleSender());

    // Example: Using a callback sender for custom processing
    stream.addSender({
      send: (data) => {
        console.log('Callback Sender received record:', data);
        // Here you could process the data, send to a message queue, etc.
      }
    });

    console.log('Starting database stream...');
    await stream.start();

    // To stop the stream after some time or on a specific event
    // setTimeout(() => {
    //   stream.stop();
    //   console.log('Database stream stopped.');
    // }, 60000); // Stop after 1 minute

  } catch (error) {
    console.error('Failed to start database stream:', error);
    process.exit(1);
  }
}

startDatabaseStream();

view raw JSON →