AWS MSK IAM SASL Signer (Python)

1.0.2 · active · verified Sat Apr 11

This is an Amazon MSK Library in Python, version 1.0.2. It provides a function to generate a base64 encoded signed URL to enable authentication and authorization with an Amazon MSK cluster using AWS IAM credentials. It acts as a pluggable library for any Python Kafka client that supports the SASL/OAUTHBEARER mechanism. The library sees releases as needed, with several updates occurring in 2023-2025.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to configure a `confluent-kafka-python` consumer to connect to an Amazon MSK cluster using IAM authentication via `aws-msk-iam-sasl-signer-python`. It defines a callback function `oauth_cb` which uses `MSKAuthTokenProvider.generate_auth_token` to retrieve the necessary SASL/OAUTHBEARER token. Ensure that your AWS credentials are configured (e.g., via `~/.aws/credentials`, environment variables, or IAM role for EC2/Lambda) and that `KAFKA_BOOTSTRAP_SERVERS` and `AWS_REGION` environment variables are set.

import os
import socket
import time
from confluent_kafka import Consumer, KafkaException
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

def oauth_cb(oauth_config):
    # MSKAuthTokenProvider.generate_auth_token returns expiry in milliseconds
    # confluent-kafka-python expects expiry in seconds since epoch
    aws_region = os.environ.get('AWS_REGION', 'us-east-1') # Or specific region for your MSK cluster
    try:
        auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region)
        return auth_token, expiry_ms / 1000
    except Exception as e:
        print(f"Error generating token: {e}")
        raise KafkaException(f"Failed to get MSK Auth Token: {e}")


# Configure Kafka Consumer
# Ensure KAFKA_BOOTSTRAP_SERVERS environment variable is set (e.g., b-1.yourcluster.abcdef.c5.kafka.us-east-1.amazonaws.com:9098)
# Ensure AWS_REGION environment variable is set
# Ensure your AWS credentials (e.g., via ~/.aws/credentials or environment variables) are configured

consumer_conf = {
    'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
    'client.id': socket.gethostname(),
    'group.id': os.environ.get('KAFKA_GROUP_ID', 'my-consumer-group'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauthbearer.token.cb': oauth_cb,
    'debug': 'broker,protocol,security' # Optional: for debugging connection issues
}

topic = os.environ.get('KAFKA_TOPIC', 'my-test-topic')

consumer = None
try:
    consumer = Consumer(consumer_conf)
    consumer.subscribe([topic])

    print(f"Consumer configured for topic: {topic}")
    print(f"Consuming messages. Press Ctrl+C to exit.")

    while True:
        msg = consumer.poll(timeout=1.0) # Poll for messages, 1-second timeout
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaException._PARTITION_EOF:
                # End of partition event - not an error
                print(f"%% {msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            print(f"Received message: key={msg.key().decode('utf-8') if msg.key() else 'None'}, value={msg.value().decode('utf-8')}, topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}")

except KafkaException as e:
    print(f"Kafka error: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")
finally:
    if consumer:
        print("Closing consumer...")
        consumer.close()

view raw JSON →