Kazoo
Kazoo is a higher-level Python client for Apache ZooKeeper, providing robust abstractions for common distributed coordination tasks like locks, leader election, and queues. Version 2.11.0 is the latest stable release, with development active and releases occurring periodically, often driven by Python version support updates and bug fixes.
Warnings
- breaking Kazoo frequently updates its supported Python versions, often dropping compatibility with older Python releases. Ensure your environment matches the supported versions to avoid compatibility issues.
- gotcha KazooClient operates asynchronously. It's crucial to correctly manage the client's lifecycle (start, stop, close) and handle connection state changes (CONNECTED, SUSPENDED, LOST) using listeners or by checking `zk.connected` before performing operations. Operations performed while not connected can lead to errors or unexpected behavior.
- gotcha The `hosts` parameter for `KazooClient` expects a comma-separated string of `host:port` pairs (e.g., '127.0.0.1:2181,127.0.0.2:2181'). Using spaces or other delimiters can lead to connection failures.
Install
-
pip install kazoo
Imports
- KazooClient
from kazoo.client import KazooClient
- KazooException
from kazoo.exceptions import KazooException
Quickstart
import os
import time
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
# Get Zookeeper host(s) from environment variable, e.g., '127.0.0.1:2181,127.0.0.1:2182'
ZOOKEEPER_HOSTS = os.environ.get('ZOOKEEPER_HOSTS', '127.0.0.1:2181')
zk = KazooClient(hosts=ZOOKEEPER_HOSTS)
@zk.add_listener
def my_listener(state):
"""Listener for Zookeeper connection state changes."""
print(f"Zookeeper state changed: {state}")
if state == 'CONNECTED':
print("Successfully connected to Zookeeper!")
elif state == 'LOST':
print("Connection to Zookeeper lost. Attempting to reconnect...")
elif state == 'SUSPENDED':
print("Connection to Zookeeper suspended. Will attempt to reconnect.")
try:
print(f"Attempting to connect to Zookeeper at {ZOOKEEPER_HOSTS}...")
zk.start()
zk.ensure_path("/my/kazoo/path")
print("Created /my/kazoo/path if it didn't exist.")
# Create an ephemeral node that will be deleted when the client disconnects
node_path = "/my/kazoo/path/ephemeral_node"
zk.create(node_path, b"hello_kazoo", ephemeral=True, sequence=False)
print(f"Created ephemeral node: {node_path} with data 'hello_kazoo'")
data, stat = zk.get(node_path)
print(f"Retrieved data from {node_path}: {data.decode('utf-8')}, Stat: {stat}")
# Keep the client alive for a few seconds to observe state changes or for other operations
time.sleep(5)
except KazooException as e:
print(f"A Kazoo-specific error occurred: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
if zk.connected:
zk.stop()
print("KazooClient stopped.")
zk.close()
print("KazooClient closed.")