Kafka Connect Python Client
kafka-connect-py is a Python client library for interacting with the Confluent Platform Kafka Connect REST API. It provides a convenient way to manage Kafka Connect clusters, including listing, creating, updating, and deleting connectors. The current stable version is 1.0.0, and releases appear to be infrequent, driven by new feature additions or maintenance.
Common errors
-
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))cause The Python client could not establish a connection to the Kafka Connect REST API. This often means the API URL is wrong, the network is blocking the connection, or the Kafka Connect service is not running or accessible.fixDouble-check the `KAFKA_CONNECT_URL` environment variable or the URL provided to `KafkaConnect`. Ensure the Kafka Connect cluster is running and accessible from the machine where your Python code is executed. Verify firewall rules or network configuration if necessary. -
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http://localhost:8083/connectors/my_missing_connector/status
cause You attempted an operation (like getting status, pausing, or resuming) on a Kafka Connect connector name that does not exist on the connected cluster.fixVerify the connector name for typos. Before attempting operations, you can list existing connectors using `client.get_connectors()` to confirm its presence. Implement `try-except` blocks to handle 404 errors gracefully. -
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http://localhost:8083/connectors
cause This error, often occurring during `create_connector` or `update_connector_config`, indicates that the provided connector configuration is invalid based on the Kafka Connect cluster's validation rules (e.g., missing required properties, incorrect plugin names, invalid values).fixInspect the error response body for detailed validation messages from the Kafka Connect API. When catching the `HTTPError`, access `e.response.json()` or `e.response.text` to read the specific reasons for the 'Bad Request' and adjust your connector configuration dictionary.
Warnings
- gotcha The client raises `requests.exceptions.ConnectionError` if the Kafka Connect REST API URL is incorrect, unreachable, or the service is down. Always ensure the `KAFKA_CONNECT_URL` is correct and the Kafka Connect cluster is running.
- gotcha When interacting with non-existent connectors (e.g., calling `get_connector_status('my_non_existent_connector')`), the Kafka Connect API typically returns a 404 Not Found response. The client translates this into a `requests.exceptions.HTTPError`.
- gotcha Invalid connector configurations (e.g., missing required fields, invalid values) passed to `create_connector` or `update_connector_config` will result in a `requests.exceptions.HTTPError` (typically 400 Bad Request). The underlying Kafka Connect API provides detailed error messages in the response body.
Install
-
pip install kafka-connect-py
Imports
- KafkaConnect
from kafka_connect import KafkaConnect
Quickstart
import os
from kafka_connect import KafkaConnect
# Ensure KAFKA_CONNECT_URL is set in your environment
# e.g., export KAFKA_CONNECT_URL="http://localhost:8083"
connect_url = os.environ.get('KAFKA_CONNECT_URL', 'http://localhost:8083')
try:
client = KafkaConnect(connect_url)
print(f"Connected to Kafka Connect at: {client.base_url}")
# Get Kafka Connect cluster version
version_info = client.get_connect_version()
print(f"Kafka Connect Version: {version_info.get('version')}")
# List active connectors
connectors = client.get_connectors()
if connectors:
print(f"Active connectors: {', '.join(connectors)}")
else:
print("No active connectors found.")
except Exception as e:
print(f"An error occurred: {e}")
print("Please ensure the Kafka Connect REST API is running and accessible at the configured URL.")