{"id":2874,"library":"aws-msk-iam-sasl-signer-python","title":"AWS MSK IAM SASL Signer (Python)","description":"This is an Amazon MSK Library in Python, version 1.0.2. It provides a function to generate a base64 encoded signed URL to enable authentication and authorization with an Amazon MSK cluster using AWS IAM credentials. It acts as a pluggable library for any Python Kafka client that supports the SASL/OAUTHBEARER mechanism. The library sees releases as needed, with several updates occurring in 2023-2025.","status":"active","version":"1.0.2","language":"en","source_language":"en","source_url":"https://github.com/aws/aws-msk-iam-sasl-signer-python","tags":["aws","msk","kafka","iam","authentication","sasl","oauthbearer"],"install":[{"cmd":"pip install aws-msk-iam-sasl-signer-python","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"Required for AWS IAM credential handling and token generation.","package":"boto3","optional":false},{"reason":"Core AWS SDK functionality, a dependency of boto3, used for credential providers.","package":"botocore","optional":false},{"reason":"Command Line Interface creation kit, used internally.","package":"click","optional":false}],"imports":[{"note":"The `MSKAuthTokenProvider` class is directly available under the `aws_msk_iam_sasl_signer` package, not within a nested module of the same name.","wrong":"from aws_msk_iam_sasl_signer.MSKAuthTokenProvider import MSKAuthTokenProvider","symbol":"MSKAuthTokenProvider","correct":"from aws_msk_iam_sasl_signer import MSKAuthTokenProvider"}],"quickstart":{"code":"import os\nimport socket\nimport time\nfrom confluent_kafka import Consumer, KafkaException\nfrom aws_msk_iam_sasl_signer import MSKAuthTokenProvider\n\ndef oauth_cb(oauth_config):\n    # MSKAuthTokenProvider.generate_auth_token returns expiry in milliseconds\n    # confluent-kafka-python expects expiry in seconds since epoch\n    aws_region = os.environ.get('AWS_REGION', 'us-east-1') # Or specific region for your MSK cluster\n    try:\n        auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region)\n        return auth_token, expiry_ms / 1000\n    except Exception as e:\n        print(f\"Error generating token: {e}\")\n        raise KafkaException(f\"Failed to get MSK Auth Token: {e}\")\n\n\n# Configure Kafka Consumer\n# Ensure KAFKA_BOOTSTRAP_SERVERS environment variable is set (e.g., b-1.yourcluster.abcdef.c5.kafka.us-east-1.amazonaws.com:9098)\n# Ensure AWS_REGION environment variable is set\n# Ensure your AWS credentials (e.g., via ~/.aws/credentials or environment variables) are configured\n\nconsumer_conf = {\n    'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),\n    'client.id': socket.gethostname(),\n    'group.id': os.environ.get('KAFKA_GROUP_ID', 'my-consumer-group'),\n    'security.protocol': 'SASL_SSL',\n    'sasl.mechanism': 'OAUTHBEARER',\n    'sasl.oauthbearer.token.cb': oauth_cb,\n    'debug': 'broker,protocol,security' # Optional: for debugging connection issues\n}\n\ntopic = os.environ.get('KAFKA_TOPIC', 'my-test-topic')\n\nconsumer = None\ntry:\n    consumer = Consumer(consumer_conf)\n    consumer.subscribe([topic])\n\n    print(f\"Consumer configured for topic: {topic}\")\n    print(f\"Consuming messages. Press Ctrl+C to exit.\")\n\n    while True:\n        msg = consumer.poll(timeout=1.0) # Poll for messages, 1-second timeout\n        if msg is None:\n            continue\n        if msg.error():\n            if msg.error().code() == KafkaException._PARTITION_EOF:\n                # End of partition event - not an error\n                print(f\"%% {msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}\")\n            elif msg.error():\n                raise KafkaException(msg.error())\n        else:\n            print(f\"Received message: key={msg.key().decode('utf-8') if msg.key() else 'None'}, value={msg.value().decode('utf-8')}, topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}\")\n\nexcept KafkaException as e:\n    print(f\"Kafka error: {e}\")\nexcept Exception as e:\n    print(f\"An unexpected error occurred: {e}\")\nfinally:\n    if consumer:\n        print(\"Closing consumer...\")\n        consumer.close()\n","lang":"python","description":"This quickstart demonstrates how to configure a `confluent-kafka-python` consumer to connect to an Amazon MSK cluster using IAM authentication via `aws-msk-iam-sasl-signer-python`. It defines a callback function `oauth_cb` which uses `MSKAuthTokenProvider.generate_auth_token` to retrieve the necessary SASL/OAUTHBEARER token. Ensure that your AWS credentials are configured (e.g., via `~/.aws/credentials`, environment variables, or IAM role for EC2/Lambda) and that `KAFKA_BOOTSTRAP_SERVERS` and `AWS_REGION` environment variables are set."},"warnings":[{"fix":"Divide `expiry_ms` by 1000 in your `oauth_cb` implementation before returning it (e.g., `return auth_token, expiry_ms / 1000`).","message":"When using `confluent-kafka-python`, the `sasl.oauthbearer.token.cb` (callback) expects the token expiry time in *seconds* since the epoch, while `MSKAuthTokenProvider.generate_auth_token` returns it in *milliseconds*. You must divide the returned expiry by 1000 before returning it from your callback.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure your Kafka client configuration specifies `sasl_mechanism='OAUTHBEARER'` and not `AWS_MSK_IAM` when using this signer library. Refer to the updated examples in the GitHub README or the AWS documentation. For `kafka-python`, use `sasl_oauth_token_provider` with your custom token provider class. For `confluent-kafka-python`, use `sasl.oauthbearer.token.cb`.","message":"For users of `dpkp/kafka-python` client, versions 2.1.0 and later introduced a change in SASL module handling, including the `AWS_MSK_IAM` mechanism. The `aws-msk-iam-sasl-signer-python` library is designed for the `OAUTHBEARER` mechanism, and older examples might not work directly with `kafka-python` versions 2.1.0+.","severity":"breaking","affected_versions":"kafka-python >= 2.1.0"},{"fix":"Always set `sasl_mechanism='OAUTHBEARER'` in your Python Kafka client configuration when using `aws-msk-iam-sasl-signer-python`.","message":"Incorrect Kafka SASL mechanism: This library facilitates IAM authentication for MSK using the `SASL_OAUTHBEARER` mechanism. Some users mistakenly attempt to configure their Kafka clients with `sasl_mechanism='AWS_MSK_IAM'`, which is a custom mechanism primarily for Java clients or specific forks. Python clients `dpkp/kafka-python` and `confluent-kafka-python` do not natively support `AWS_MSK_IAM` and require `OAUTHBEARER` when used with this library.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Verify that your client and MSK cluster are in the same VPC or have appropriate VPC peering/connectivity. Ensure security groups allow inbound traffic on port 9098 from your client's security group to the MSK broker security group.","message":"Network connectivity issues (e.g., 'Connection reset') are frequent if the client application (e.g., Lambda function, EC2 instance) is not in the same VPC as the MSK cluster or if the security groups are improperly configured. Traffic on TCP port 9098 (for SASL_SSL) must be allowed between the client and the MSK brokers.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Avoid disabling TLS host verification unless absolutely necessary and with a clear understanding of the security implications. If using a custom SSL context, ensure proper certificate validation is in place.","message":"Disabling TLS host verification (`ssl_context.check_hostname = False`, `ssl_context.verify_mode = ssl.CERT_NONE`) in some client examples (e.g., `aiokafka`) can introduce security vulnerabilities. This should only be done if you fully understand and accept the risks.","severity":"gotcha","affected_versions":"All versions"},{"fix":"If using transactional producers, ensure a connection is established by polling or flushing before calling `init_transactions`. Consult the GitHub issues for any updates or workarounds.","message":"Transactional producers may not work correctly with IAM authentication. There's an open issue reporting connection failures when `init_transactions` is called, suggesting a potential timing issue with connection establishment.","severity":"gotcha","affected_versions":"All versions, specific to confluent-kafka >= 2.4.0 (reported)"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}