AWS MSK IAM SASL Signer (Python)
This is an Amazon MSK Library in Python, version 1.0.2. It provides a function to generate a base64 encoded signed URL to enable authentication and authorization with an Amazon MSK cluster using AWS IAM credentials. It acts as a pluggable library for any Python Kafka client that supports the SASL/OAUTHBEARER mechanism. The library sees releases as needed, with several updates occurring in 2023-2025.
Warnings
- gotcha When using `confluent-kafka-python`, the `sasl.oauthbearer.token.cb` (callback) expects the token expiry time in *seconds* since the epoch, while `MSKAuthTokenProvider.generate_auth_token` returns it in *milliseconds*. You must divide the returned expiry by 1000 before returning it from your callback.
- breaking For users of `dpkp/kafka-python` client, versions 2.1.0 and later introduced a change in SASL module handling, including the `AWS_MSK_IAM` mechanism. The `aws-msk-iam-sasl-signer-python` library is designed for the `OAUTHBEARER` mechanism, and older examples might not work directly with `kafka-python` versions 2.1.0+.
- gotcha Incorrect Kafka SASL mechanism: This library facilitates IAM authentication for MSK using the `SASL_OAUTHBEARER` mechanism. Some users mistakenly attempt to configure their Kafka clients with `sasl_mechanism='AWS_MSK_IAM'`, which is a custom mechanism primarily for Java clients or specific forks. Python clients `dpkp/kafka-python` and `confluent-kafka-python` do not natively support `AWS_MSK_IAM` and require `OAUTHBEARER` when used with this library.
- gotcha Network connectivity issues (e.g., 'Connection reset') are frequent if the client application (e.g., Lambda function, EC2 instance) is not in the same VPC as the MSK cluster or if the security groups are improperly configured. Traffic on TCP port 9098 (for SASL_SSL) must be allowed between the client and the MSK brokers.
- gotcha Disabling TLS host verification (`ssl_context.check_hostname = False`, `ssl_context.verify_mode = ssl.CERT_NONE`) in some client examples (e.g., `aiokafka`) can introduce security vulnerabilities. This should only be done if you fully understand and accept the risks.
- gotcha Transactional producers may not work correctly with IAM authentication. There's an open issue reporting connection failures when `init_transactions` is called, suggesting a potential timing issue with connection establishment.
Install
-
pip install aws-msk-iam-sasl-signer-python
Imports
- MSKAuthTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
Quickstart
import os
import socket
import time
from confluent_kafka import Consumer, KafkaException
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
def oauth_cb(oauth_config):
# MSKAuthTokenProvider.generate_auth_token returns expiry in milliseconds
# confluent-kafka-python expects expiry in seconds since epoch
aws_region = os.environ.get('AWS_REGION', 'us-east-1') # Or specific region for your MSK cluster
try:
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region)
return auth_token, expiry_ms / 1000
except Exception as e:
print(f"Error generating token: {e}")
raise KafkaException(f"Failed to get MSK Auth Token: {e}")
# Configure Kafka Consumer
# Ensure KAFKA_BOOTSTRAP_SERVERS environment variable is set (e.g., b-1.yourcluster.abcdef.c5.kafka.us-east-1.amazonaws.com:9098)
# Ensure AWS_REGION environment variable is set
# Ensure your AWS credentials (e.g., via ~/.aws/credentials or environment variables) are configured
consumer_conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
'client.id': socket.gethostname(),
'group.id': os.environ.get('KAFKA_GROUP_ID', 'my-consumer-group'),
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.cb': oauth_cb,
'debug': 'broker,protocol,security' # Optional: for debugging connection issues
}
topic = os.environ.get('KAFKA_TOPIC', 'my-test-topic')
consumer = None
try:
consumer = Consumer(consumer_conf)
consumer.subscribe([topic])
print(f"Consumer configured for topic: {topic}")
print(f"Consuming messages. Press Ctrl+C to exit.")
while True:
msg = consumer.poll(timeout=1.0) # Poll for messages, 1-second timeout
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# End of partition event - not an error
print(f"%% {msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
print(f"Received message: key={msg.key().decode('utf-8') if msg.key() else 'None'}, value={msg.value().decode('utf-8')}, topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}")
except KafkaException as e:
print(f"Kafka error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
if consumer:
print("Closing consumer...")
consumer.close()