{"id":5573,"library":"aws-kinesis-agg","title":"AWS Kinesis Aggregation and Deaggregation","description":"The `aws-kinesis-agg` Python module assists in handling the Kinesis Producer Library (KPL) message aggregation format. It provides utilities for both aggregating multiple user records into a single Kinesis record (for improved throughput and cost efficiency) and deaggregating such records back into their original user records. The library is currently at version 2.0.3 (on GitHub, PyPI shows 1.2.3) with an active, though irregular, release cadence.","status":"active","version":"2.0.3","language":"en","source_language":"en","source_url":"https://github.com/awslabs/kinesis-aggregation","tags":["aws","kinesis","kpl","aggregation","deaggregation","streaming"],"install":[{"cmd":"pip install aws-kinesis-agg","lang":"bash","label":"Install latest PyPI version (Note: may be outdated, see warnings)"}],"dependencies":[{"reason":"Required for interacting with AWS services, including Kinesis (though the library itself doesn't directly send/receive, it expects boto3 to be available for broader AWS workflows).","package":"boto3","optional":false},{"reason":"Required for parsing and serializing the KPL's Protocol Buffer message format.","package":"protobuf","optional":false}],"imports":[{"note":"Class for aggregating records.","symbol":"KinesisAggregator","correct":"from aws_kinesis_agg.aggregator import KinesisAggregator"},{"note":"Function to deaggregate a KPL record into a list of user records.","symbol":"deaggregate_records","correct":"from aws_kinesis_agg.deaggregator import deaggregate_records"},{"note":"Function to deaggregate a KPL record into an iterator of user records (memory efficient).","symbol":"iter_deaggregate_records","correct":"from aws_kinesis_agg.deaggregator import iter_deaggregate_records"}],"quickstart":{"code":"import base64\nfrom aws_kinesis_agg.aggregator import KinesisAggregator\nfrom aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records\n\n# --- Aggregation Example ---\nprint(\"--- Aggregation Example ---\")\naggregator = KinesisAggregator()\n\n# Add multiple user records with partition keys and optional explicit hash keys\naggregator.add_user_record(\"my_app_pk\", b\"data_for_record_1\")\naggregator.add_user_record(\"my_app_pk\", b\"data_for_record_2\", explicit_hash_key=\"12345\")\naggregator.add_user_record(\"another_pk\", b\"data_for_record_3\")\n\n# Get the aggregated record bytes, ready to be sent to Kinesis\naggregated_bytes = aggregator.get_aggregated_record_bytes()\nprint(f\"Aggregated record size: {len(aggregated_bytes)} bytes\")\n\n# --- Deaggregation Example ---\nprint(\"\\n--- Deaggregation Example ---\")\n\n# Simulate receiving an aggregated record, e.g., from an AWS Lambda Kinesis event.\n# Lambda events base64-encode the data, so we simulate that.\n\n# First, create a fresh aggregated record to ensure valid input for deaggregation\nfresh_aggregator = KinesisAggregator()\nfresh_aggregator.add_user_record(\"deagg_pk_a\", b\"deaggregated_data_A\")\nfresh_aggregator.add_user_record(\"deagg_pk_b\", b\"deaggregated_data_B\", explicit_hash_key=\"98765\")\nfresh_aggregator.add_user_record(\"deagg_pk_c\", b\"deaggregated_data_C\")\nsimulated_kpl_record_bytes = fresh_aggregator.get_aggregated_record_bytes()\n\n# If receiving from Lambda, data would be base64 encoded:\nsimulated_kpl_record_base64 = base64.b64encode(simulated_kpl_record_bytes).decode('utf-8')\nprint(f\"Simulated KPL record (base64-encoded for Lambda-like input): {simulated_kpl_record_base64[:70]}...\")\n\n# Decode the base64 data first, as Lambda would deliver it\nkinesis_record_data = base64.b64decode(simulated_kpl_record_base64)\n\n# 1. Using deaggregate_records (returns a list of UserRecord objects)\nprint(\"\\nDeaggregated records (using deaggregate_records, returns a list):\")\nuser_records_list = deaggregate_records(kinesis_record_data)\nfor i, record in enumerate(user_records_list):\n    print(f\"  Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}\")\n\n# 2. Using iter_deaggregate_records (returns an iterator, memory efficient for many records)\nprint(\"\\nDeaggregated records (using iter_deaggregate_records, returns an iterator):\")\nuser_records_iterator = iter_deaggregate_records(kinesis_record_data)\nfor i, record in enumerate(user_records_iterator):\n    print(f\"  Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}\")\n","lang":"python","description":"This quickstart demonstrates both aggregation and deaggregation. It shows how to use `KinesisAggregator` to combine multiple user records into a single KPL-formatted record and how to use `deaggregate_records` or `iter_deaggregate_records` to extract original user records from an aggregated KPL record. The deaggregation example includes handling base64-encoded data, common when processing Kinesis records from AWS Lambda events."},"warnings":[{"fix":"Check the GitHub repository's releases for the latest version. If a newer version is needed, consider installing directly from GitHub or specifying the exact version if it eventually lands on PyPI (e.g., `pip install aws-kinesis-agg==2.0.3`).","message":"The PyPI version (`pip install aws-kinesis-agg`) often lags significantly behind the latest GitHub releases. For example, PyPI currently offers 1.2.3 while GitHub has 2.0.3. Users should be aware they might install an outdated version if relying solely on `pip install`.","severity":"gotcha","affected_versions":"<2.0.3"},{"fix":"Review your Kinesis producer and consumer applications to ensure compatibility, especially if you are using older versions of AWS SDKs elsewhere in your stack. Test thoroughly after upgrading.","message":"Version 2.0.0 introduced support for AWS SDK V2. While the core API of this Python library primarily deals with data formats, this implies a shift in expected behavior or compatibility with KPL clients built with newer AWS SDKs. Users migrating from pre-2.0.0 versions should re-test their integrations.","severity":"breaking","affected_versions":"<2.0.0"},{"fix":"Before deaggregating, ensure the Kinesis record data is indeed a KPL-aggregated record. KPL records begin with a magic number (0xF3899AFC). The deaggregation functions in this library perform this check internally and will raise an `InvalidKPLRecordError` if it's not a valid KPL record.","message":"This library is exclusively designed for records aggregated using the Kinesis Producer Library (KPL) format. It will not correctly deaggregate standard Kinesis records or records aggregated by other custom methods. Feeding non-KPL aggregated data may result in errors or malformed output.","severity":"gotcha","affected_versions":"All"},{"fix":"Always apply `base64.b64decode(record_data_string)` to the Kinesis record data obtained from Lambda events before attempting to deaggregate.","message":"When consuming Kinesis records via AWS Lambda events, the `data` field for each Kinesis record in the event payload is Base64-encoded. You must decode this string back into raw bytes (`base64.b64decode()`) before passing it to `deaggregate_records` or `iter_deaggregate_records`.","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}