{"id":16291,"library":"af-streams","title":"Database Table Change Data Capture Streamer","description":"af-streams is a Node.js library designed to generate data streams directly from database tables, acting as a lightweight change data capture (CDC) mechanism. It currently supports MSSQL and PostgreSQL databases. The library operates by monitoring a specified timestamp field and a set of primary key fields within a table to detect and stream new or updated records. It provides flexible output options, including sending data to the console, a callback function, an event emitter, a TCP JSON sender (in WSO2 event binary format), or a WebSocket JSON sender. Additionally, it offers various windowing functionalities like `SingleEventTimeWindow`, `TimeWindow`, `KeyedNumberWindow`, `KeyedSingleEventTimeWindow`, and `KeyedTimeWindow` for advanced stream processing. The current stable version is 3.1.60, and it targets Node.js environments version 14 and higher. While a specific release cadence isn't published, the version number indicates active development.","status":"active","version":"3.1.60","language":"javascript","source_language":"en","source_url":"https://github.com/Bazilio-san/af-streams","tags":["javascript","stream","database","table","change data capture","cdc","typescript"],"install":[{"cmd":"npm install af-streams","lang":"bash","label":"npm"},{"cmd":"yarn add af-streams","lang":"bash","label":"yarn"},{"cmd":"pnpm add af-streams","lang":"bash","label":"pnpm"}],"dependencies":[],"imports":[{"note":"Main class for initiating a database stream. ESM is preferred; CommonJS `require` is a common mistake in modern Node.js projects, especially with TypeScript.","wrong":"const { DatabaseStream } = require('af-streams');","symbol":"DatabaseStream","correct":"import { DatabaseStream } from 'af-streams';"},{"note":"One of the several named exports for stream windowing logic. Always use named imports for these utilities.","wrong":"import TimeWindow from 'af-streams';","symbol":"TimeWindow","correct":"import { TimeWindow } from 'af-streams';"},{"note":"A utility class for outputting stream data to the console. Follows the named import pattern.","wrong":"const ConsoleSender = require('af-streams').ConsoleSender;","symbol":"ConsoleSender","correct":"import { ConsoleSender } from 'af-streams';"}],"quickstart":{"code":"import { DatabaseStream, ConsoleSender } from 'af-streams';\nimport { type ConnectionOptions as MSSQLConnectionOptions } from 'mssql';\n\nconst mssqlConfig: MSSQLConnectionOptions = {\n  user: process.env.DB_USER ?? 'sa',\n  password: process.env.DB_PASSWORD ?? 'YourStrong@Password',\n  server: process.env.DB_HOST ?? 'localhost', \n  database: process.env.DB_NAME ?? 'MyDatabase',\n  options: {\n    encrypt: process.env.DB_ENCRYPT === 'true', // Use for Azure SQL Database, default: false\n    trustServerCertificate: process.env.DB_TRUST_SERVER_CERTIFICATE === 'true' // Change to true for local dev / self-signed certs\n  }\n};\n\nasync function startDatabaseStream() {\n  try {\n    const stream = new DatabaseStream({\n      dbType: 'mssql', // or 'postgres'\n      dbConfig: mssqlConfig,\n      table: 'YourTableName',\n      timestampField: 'UpdatedAt',\n      primaryKeyFields: ['Id', 'TenantId'], // Composite primary key\n      pollIntervalMs: 5000, // Poll every 5 seconds\n      // Additional options like initialTimestamp, batchSize, etc.\n    });\n\n    // Example: Using the ConsoleSender to output streamed data\n    stream.addSender(new ConsoleSender());\n\n    // Example: Using a callback sender for custom processing\n    stream.addSender({\n      send: (data) => {\n        console.log('Callback Sender received record:', data);\n        // Here you could process the data, send to a message queue, etc.\n      }\n    });\n\n    console.log('Starting database stream...');\n    await stream.start();\n\n    // To stop the stream after some time or on a specific event\n    // setTimeout(() => {\n    //   stream.stop();\n    //   console.log('Database stream stopped.');\n    // }, 60000); // Stop after 1 minute\n\n  } catch (error) {\n    console.error('Failed to start database stream:', error);\n    process.exit(1);\n  }\n}\n\nstartDatabaseStream();\n","lang":"typescript","description":"This quickstart demonstrates how to set up and start a data stream from an MSSQL database table using `af-streams`, configured to output to both the console and a custom callback function. It highlights essential configuration for database connection, table specifics, and stream polling, using environment variables for sensitive credentials."},"warnings":[{"fix":"Upgrade your Node.js environment to version 14 or newer. Consult your project's `engines` field in `package.json` for specific requirements.","message":"The package requires Node.js version 14 or higher. Running it on older Node.js versions will result in runtime errors due to unsupported syntax or APIs.","severity":"breaking","affected_versions":">=3.0.0"},{"fix":"Ensure your database table schema includes a timestamp field (e.g., `UpdatedAt`, `CreatedAt`) and that `timestampField` and `primaryKeyFields` are correctly specified in your `DatabaseStream` configuration.","message":"Database tables streamed by af-streams must have a timestamp field and a defined set of primary key fields. Incorrect or missing configuration for these fields will prevent the stream from identifying new or updated records correctly.","severity":"gotcha","affected_versions":">=1.0.0"},{"fix":"Review the official changelog or migration guide for `af-streams` v3 on its GitHub repository to understand specific breaking changes and update your code accordingly.","message":"Major version 3 introduced potential breaking changes from previous versions (v1 or v2). While specific changes aren't detailed in the provided excerpt, typically new major versions involve API changes or modified behavior that requires adaptation.","severity":"breaking","affected_versions":">=3.0.0"},{"fix":"Install the necessary database driver package, e.g., `npm install mssql` for MSSQL or `npm install pg` for PostgreSQL, in your project.","message":"Database drivers (e.g., `mssql` or `pg`) are not direct dependencies of `af-streams` itself. You must explicitly install the appropriate driver for your chosen database type (`dbType`) for the library to function.","severity":"gotcha","affected_versions":">=1.0.0"}],"env_vars":null,"last_verified":"2026-04-22T00:00:00.000Z","next_check":"2026-07-21T00:00:00.000Z","problems":[{"fix":"Update your Node.js environment to version 14 or higher. Use a Node Version Manager (nvm) for easy switching: `nvm install 18 && nvm use 18`.","cause":"Attempting to run `af-streams` on an incompatible Node.js version, typically older than v14.","error":"TypeError: Invalid Version: 12.x.x"},{"fix":"Add `timestampField: 'YourTimestampColumnName'` to your `DatabaseStream` configuration, pointing to a valid timestamp column in your database table.","cause":"The `timestampField` property, crucial for change detection, was not provided or was null in the `DatabaseStream` constructor.","error":"Error: Missing `timestampField` in stream configuration."},{"fix":"Specify `primaryKeyFields: ['Column1', 'Column2']` (or a single string for a single-column primary key) in your `DatabaseStream` configuration.","cause":"The `primaryKeyFields` array, required for uniquely identifying records, was empty or missing from the `DatabaseStream` configuration.","error":"Error: No primary key fields defined. Provide 'primaryKeyFields' in the configuration."},{"fix":"Install the correct database driver for your setup: `npm install mssql` for MSSQL, or `npm install pg` for PostgreSQL.","cause":"The required database driver for the specified `dbType` was not installed as a peer dependency.","error":"Cannot find module 'mssql' (or 'pg')"}],"ecosystem":"npm"}