Type annotations for boto3 Kafka
mypy-boto3-kafka provides type annotations for the AWS boto3 Kafka service. It is a generated library, currently at version 1.42.65, built with mypy-boto3-builder 8.12.0. Its release cadence generally follows updates to the underlying boto3 library and the builder.
Warnings
- breaking The underlying mypy-boto3-builder (version 8.12.0) has removed support for Python 3.8. Therefore, mypy-boto3-kafka no longer supports Python 3.8.
- breaking mypy-boto3-builder version 8.9.0 introduced breaking changes to `TypeDef` naming conventions. Type definitions for packed method arguments use shorter names (e.g., `CreateDistributionRequestRequestTypeDef` -> `CreateDistributionRequestTypeDef`), and conflicting `Extra` postfixes were moved (e.g., `CreateDistributionExtraRequestTypeDef` -> `CreateDistributionRequestExtraTypeDef`).
- gotcha This package provides only type stubs. For actual runtime functionality, you must also install and configure the `boto3` library.
- gotcha When using PyCharm, performance issues with `Literal` overloads might occur. It's recommended to disable PyCharm's internal type checker or use `boto3-stubs-lite` variants for better performance.
- gotcha For Pylint compatibility and to avoid runtime dependencies on type stubs, it is common to guard imports with `if TYPE_CHECKING:` and set types to `object` in the `else` branch. Pylint might otherwise complain about undefined variables.
Install
-
pip install mypy-boto3-kafka -
pip install boto3-stubs[kafka]
Imports
- KafkaClient
from mypy_boto3_kafka import KafkaClient
- AmazonMskClusterTypeDef
from mypy_boto3_kafka.type_defs import AmazonMskClusterTypeDef
- BrokerAZDistributionType
from mypy_boto3_kafka.literals import BrokerAZDistributionType
Quickstart
import os
import boto3
from mypy_boto3_kafka import KafkaClient
# Ensure boto3 is configured, e.g., via AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION_NAME env vars
region = os.environ.get('AWS_REGION_NAME', 'us-east-1')
def get_kafka_client() -> KafkaClient:
session = boto3.Session(region_name=region)
client: KafkaClient = session.client('kafka')
return client
kafka_client = get_kafka_client()
response = kafka_client.list_clusters_v2()
print(f"Kafka Clusters: {response.get('ClusterInfoList')}")