AWS Kinesis Aggregation and Deaggregation
The `aws-kinesis-agg` Python module assists in handling the Kinesis Producer Library (KPL) message aggregation format. It provides utilities for both aggregating multiple user records into a single Kinesis record (for improved throughput and cost efficiency) and deaggregating such records back into their original user records. The library is currently at version 2.0.3 (on GitHub, PyPI shows 1.2.3) with an active, though irregular, release cadence.
Warnings
- gotcha The PyPI version (`pip install aws-kinesis-agg`) often lags significantly behind the latest GitHub releases. For example, PyPI currently offers 1.2.3 while GitHub has 2.0.3. Users should be aware they might install an outdated version if relying solely on `pip install`.
- breaking Version 2.0.0 introduced support for AWS SDK V2. While the core API of this Python library primarily deals with data formats, this implies a shift in expected behavior or compatibility with KPL clients built with newer AWS SDKs. Users migrating from pre-2.0.0 versions should re-test their integrations.
- gotcha This library is exclusively designed for records aggregated using the Kinesis Producer Library (KPL) format. It will not correctly deaggregate standard Kinesis records or records aggregated by other custom methods. Feeding non-KPL aggregated data may result in errors or malformed output.
- gotcha When consuming Kinesis records via AWS Lambda events, the `data` field for each Kinesis record in the event payload is Base64-encoded. You must decode this string back into raw bytes (`base64.b64decode()`) before passing it to `deaggregate_records` or `iter_deaggregate_records`.
Install
-
pip install aws-kinesis-agg
Imports
- KinesisAggregator
from aws_kinesis_agg.aggregator import KinesisAggregator
- deaggregate_records
from aws_kinesis_agg.deaggregator import deaggregate_records
- iter_deaggregate_records
from aws_kinesis_agg.deaggregator import iter_deaggregate_records
Quickstart
import base64
from aws_kinesis_agg.aggregator import KinesisAggregator
from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records
# --- Aggregation Example ---
print("--- Aggregation Example ---")
aggregator = KinesisAggregator()
# Add multiple user records with partition keys and optional explicit hash keys
aggregator.add_user_record("my_app_pk", b"data_for_record_1")
aggregator.add_user_record("my_app_pk", b"data_for_record_2", explicit_hash_key="12345")
aggregator.add_user_record("another_pk", b"data_for_record_3")
# Get the aggregated record bytes, ready to be sent to Kinesis
aggregated_bytes = aggregator.get_aggregated_record_bytes()
print(f"Aggregated record size: {len(aggregated_bytes)} bytes")
# --- Deaggregation Example ---
print("\n--- Deaggregation Example ---")
# Simulate receiving an aggregated record, e.g., from an AWS Lambda Kinesis event.
# Lambda events base64-encode the data, so we simulate that.
# First, create a fresh aggregated record to ensure valid input for deaggregation
fresh_aggregator = KinesisAggregator()
fresh_aggregator.add_user_record("deagg_pk_a", b"deaggregated_data_A")
fresh_aggregator.add_user_record("deagg_pk_b", b"deaggregated_data_B", explicit_hash_key="98765")
fresh_aggregator.add_user_record("deagg_pk_c", b"deaggregated_data_C")
simulated_kpl_record_bytes = fresh_aggregator.get_aggregated_record_bytes()
# If receiving from Lambda, data would be base64 encoded:
simulated_kpl_record_base64 = base64.b64encode(simulated_kpl_record_bytes).decode('utf-8')
print(f"Simulated KPL record (base64-encoded for Lambda-like input): {simulated_kpl_record_base64[:70]}...")
# Decode the base64 data first, as Lambda would deliver it
kinesis_record_data = base64.b64decode(simulated_kpl_record_base64)
# 1. Using deaggregate_records (returns a list of UserRecord objects)
print("\nDeaggregated records (using deaggregate_records, returns a list):")
user_records_list = deaggregate_records(kinesis_record_data)
for i, record in enumerate(user_records_list):
print(f" Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}")
# 2. Using iter_deaggregate_records (returns an iterator, memory efficient for many records)
print("\nDeaggregated records (using iter_deaggregate_records, returns an iterator):")
user_records_iterator = iter_deaggregate_records(kinesis_record_data)
for i, record in enumerate(user_records_iterator):
print(f" Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}")