AWS Kinesis Aggregation and Deaggregation

2.0.3 · active · verified Sat Apr 11

The `aws-kinesis-agg` Python module assists in handling the Kinesis Producer Library (KPL) message aggregation format. It provides utilities for both aggregating multiple user records into a single Kinesis record (for improved throughput and cost efficiency) and deaggregating such records back into their original user records. The library is currently at version 2.0.3 (on GitHub, PyPI shows 1.2.3) with an active, though irregular, release cadence.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates both aggregation and deaggregation. It shows how to use `KinesisAggregator` to combine multiple user records into a single KPL-formatted record and how to use `deaggregate_records` or `iter_deaggregate_records` to extract original user records from an aggregated KPL record. The deaggregation example includes handling base64-encoded data, common when processing Kinesis records from AWS Lambda events.

import base64
from aws_kinesis_agg.aggregator import KinesisAggregator
from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records

# --- Aggregation Example ---
print("--- Aggregation Example ---")
aggregator = KinesisAggregator()

# Add multiple user records with partition keys and optional explicit hash keys
aggregator.add_user_record("my_app_pk", b"data_for_record_1")
aggregator.add_user_record("my_app_pk", b"data_for_record_2", explicit_hash_key="12345")
aggregator.add_user_record("another_pk", b"data_for_record_3")

# Get the aggregated record bytes, ready to be sent to Kinesis
aggregated_bytes = aggregator.get_aggregated_record_bytes()
print(f"Aggregated record size: {len(aggregated_bytes)} bytes")

# --- Deaggregation Example ---
print("\n--- Deaggregation Example ---")

# Simulate receiving an aggregated record, e.g., from an AWS Lambda Kinesis event.
# Lambda events base64-encode the data, so we simulate that.

# First, create a fresh aggregated record to ensure valid input for deaggregation
fresh_aggregator = KinesisAggregator()
fresh_aggregator.add_user_record("deagg_pk_a", b"deaggregated_data_A")
fresh_aggregator.add_user_record("deagg_pk_b", b"deaggregated_data_B", explicit_hash_key="98765")
fresh_aggregator.add_user_record("deagg_pk_c", b"deaggregated_data_C")
simulated_kpl_record_bytes = fresh_aggregator.get_aggregated_record_bytes()

# If receiving from Lambda, data would be base64 encoded:
simulated_kpl_record_base64 = base64.b64encode(simulated_kpl_record_bytes).decode('utf-8')
print(f"Simulated KPL record (base64-encoded for Lambda-like input): {simulated_kpl_record_base64[:70]}...")

# Decode the base64 data first, as Lambda would deliver it
kinesis_record_data = base64.b64decode(simulated_kpl_record_base64)

# 1. Using deaggregate_records (returns a list of UserRecord objects)
print("\nDeaggregated records (using deaggregate_records, returns a list):")
user_records_list = deaggregate_records(kinesis_record_data)
for i, record in enumerate(user_records_list):
    print(f"  Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}")

# 2. Using iter_deaggregate_records (returns an iterator, memory efficient for many records)
print("\nDeaggregated records (using iter_deaggregate_records, returns an iterator):")
user_records_iterator = iter_deaggregate_records(kinesis_record_data)
for i, record in enumerate(user_records_iterator):
    print(f"  Record {i+1}: PartitionKey={record.partition_key}, Data={record.data.decode()}")

view raw JSON →