Pulsar Python Client
The `pulsar-client` library provides a Python API for interacting with Apache Pulsar, a distributed messaging and streaming platform. It allows applications to produce and consume messages from Pulsar topics, supporting various messaging patterns and schema management. The current version is 3.10.0, with releases occurring periodically to keep pace with the main Pulsar project and ensure compatibility with new broker features.
Warnings
- breaking Pulsar Python client versions are generally aligned with Apache Pulsar Broker versions. Using a client version (e.g., 3.x) with an older broker (e.g., 2.x) or vice-versa might lead to unexpected behavior, missing features, or connection failures. Always ensure compatibility between client and broker versions.
- gotcha Failing to explicitly call `.close()` on `pulsar.Client`, `Producer`, and `Consumer` objects can lead to resource leaks (e.g., open connections, file descriptors) and connection issues in long-running applications.
- gotcha Correctly configuring authentication (e.g., JWT tokens) and TLS for secure connections is a common source of initial setup errors. Incorrect parameters can lead to connection refused errors or authentication failures.
Install
-
pip install pulsar-client
Imports
- Client
from pulsar import Client
- Producer
from pulsar import Producer
- Consumer
from pulsar import Consumer
- MessageId
from pulsar import MessageId
- Schema
from pulsar import Schema
- ConsumerType
from pulsar import ConsumerType
Quickstart
import pulsar
import time
import os
# --- Configuration ---
# Ensure a Pulsar broker is running, e.g., locally via Docker:
# docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:latest bin/pulsar standalone
PULSAR_BROKER_URL = os.environ.get('PULSAR_BROKER_URL', 'pulsar://localhost:6650')
TOPIC_NAME = 'persistent://public/default/my-python-topic-qs'
SUBSCRIPTION_NAME = 'my-python-subscription-qs'
print(f"Connecting to Pulsar at: {PULSAR_BROKER_URL}")
# --- Producer ---
try:
print("\n--- Starting Producer ---")
client_producer = pulsar.Client(PULSAR_BROKER_URL)
producer = client_producer.create_producer(TOPIC_NAME)
for i in range(3):
message_data = f"hello-pulsar-message-{i}".encode('utf-8')
producer.send(message_data)
print(f"Producer sent: {message_data.decode()}")
time.sleep(0.1) # Small delay
producer.close()
client_producer.close()
print("--- Producer finished. ---")
except Exception as e:
print(f"Producer error: {e}")
time.sleep(1) # Give a moment for messages to be available in the topic
# --- Consumer ---
try:
print("\n--- Starting Consumer ---")
client_consumer = pulsar.Client(PULSAR_BROKER_URL)
consumer = client_consumer.subscribe(TOPIC_NAME, SUBSCRIPTION_NAME, consumer_type=pulsar.ConsumerType.Shared)
print("Consumer waiting for messages...")
received_count = 0
# We expect 3 messages from the producer
while received_count < 3:
try:
msg = consumer.receive(timeout_millis=3000) # Wait up to 3 seconds
if msg:
print(f"Consumer received: '{msg.data().decode('utf-8')}' (ID: {msg.message_id()})")
consumer.acknowledge(msg)
received_count += 1
else:
print("Consumer timed out waiting for message. Retrying...")
except Exception as msg_err:
print(f"Error receiving individual message: {msg_err}")
break # Exit on error
consumer.close()
client_consumer.close()
print("---
Consumer finished. ---")
except Exception as e:
print(f"Consumer error: {e}")
print("\nQuickstart demonstration complete.")