Google Cloud Managed Kafka
The `google-cloud-managedkafka` client library for Python allows developers to interact with the Google Cloud Managed Service for Apache Kafka API. This library, currently at version `0.4.0`, provides programmatic access to manage Kafka clusters, topics, and consumers within Google Cloud. It follows Google Cloud's frequent release cadence, with updates often tied to underlying API changes.
Warnings
- gotcha Google Cloud client libraries require proper authentication. The quickstart assumes `gcloud auth application-default login` has been run or `GOOGLE_APPLICATION_CREDENTIALS` is set. Without valid credentials, API calls will fail with authentication errors.
- gotcha Resource names for Google Cloud APIs follow a specific, hierarchical string format (e.g., `projects/PROJECT_ID/locations/LOCATION_ID/clusters/CLUSTER_ID`). Incorrect formatting will lead to `google.api_core.exceptions.InvalidArgument` errors.
- gotcha As a `0.x.x` version library, the API surface may evolve more rapidly than `1.x.x` or higher versions. While Google strives for stability, minor updates might introduce non-breaking changes or new features that could subtly alter behavior or require updates to your code for new patterns.
Install
-
pip install google-cloud-managedkafka
Imports
- ManagedKafkaClient
from google.cloud.managedkafka_v1 import ManagedKafkaClient
Quickstart
import os
from google.cloud.managedkafka_v1 import ManagedKafkaClient
# Set your Google Cloud project ID and location
# Ensure these environment variables are set or replace with actual values
project_id = os.environ.get("GCP_PROJECT_ID", "your-gcp-project-id")
location_id = os.environ.get("GCP_LOCATION_ID", "us-central1") # e.g., us-central1
# Construct the parent resource name
parent = f"projects/{project_id}/locations/{location_id}"
client = ManagedKafkaClient()
try:
print(f"Listing Managed Kafka clusters in {parent}...")
# The 'parent' parameter expects a string in the format 'projects/PROJECT_ID/locations/LOCATION_ID'
clusters = client.list_clusters(parent=parent)
found_clusters = False
for cluster in clusters:
print(f" - Cluster name: {cluster.name}, Capacity: {cluster.capacity_config.vcpu_count} vCPUs")
found_clusters = True
if not found_clusters:
print(" No clusters found. Ensure you have clusters deployed in this location.")
except Exception as e:
print(f"An error occurred: {e}")
print("Ensure you have authenticated (e.g., `gcloud auth application-default login`)")
print("and have sufficient IAM permissions (e.g., `roles/managedkafka.viewer`).")