Pulsar Python Client

3.10.0 · active · verified Sat Apr 11

The `pulsar-client` library provides a Python API for interacting with Apache Pulsar, a distributed messaging and streaming platform. It allows applications to produce and consume messages from Pulsar topics, supporting various messaging patterns and schema management. The current version is 3.10.0, with releases occurring periodically to keep pace with the main Pulsar project and ensure compatibility with new broker features.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to produce and consume messages using the `pulsar-client` library. It assumes a Pulsar broker is running and accessible (e.g., via `pulsar://localhost:6650`). The script first acts as a producer, sending three messages to a topic, then switches to a consumer, subscribing to the same topic and receiving those messages. Crucially, it shows proper resource management by calling `.close()` on clients, producers, and consumers. The Pulsar broker URL can be configured via the `PULSAR_BROKER_URL` environment variable.

import pulsar
import time
import os

# --- Configuration ---
# Ensure a Pulsar broker is running, e.g., locally via Docker:
# docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:latest bin/pulsar standalone
PULSAR_BROKER_URL = os.environ.get('PULSAR_BROKER_URL', 'pulsar://localhost:6650')
TOPIC_NAME = 'persistent://public/default/my-python-topic-qs'
SUBSCRIPTION_NAME = 'my-python-subscription-qs'

print(f"Connecting to Pulsar at: {PULSAR_BROKER_URL}")

# --- Producer ---
try:
    print("\n--- Starting Producer ---")
    client_producer = pulsar.Client(PULSAR_BROKER_URL)
    producer = client_producer.create_producer(TOPIC_NAME)

    for i in range(3):
        message_data = f"hello-pulsar-message-{i}".encode('utf-8')
        producer.send(message_data)
        print(f"Producer sent: {message_data.decode()}")
        time.sleep(0.1) # Small delay

    producer.close()
    client_producer.close()
    print("--- Producer finished. ---")
except Exception as e:
    print(f"Producer error: {e}")

time.sleep(1) # Give a moment for messages to be available in the topic

# --- Consumer ---
try:
    print("\n--- Starting Consumer ---")
    client_consumer = pulsar.Client(PULSAR_BROKER_URL)
    consumer = client_consumer.subscribe(TOPIC_NAME, SUBSCRIPTION_NAME, consumer_type=pulsar.ConsumerType.Shared)

    print("Consumer waiting for messages...")
    received_count = 0
    # We expect 3 messages from the producer
    while received_count < 3:
        try:
            msg = consumer.receive(timeout_millis=3000) # Wait up to 3 seconds
            if msg:
                print(f"Consumer received: '{msg.data().decode('utf-8')}' (ID: {msg.message_id()})")
                consumer.acknowledge(msg)
                received_count += 1
            else:
                print("Consumer timed out waiting for message. Retrying...")
        except Exception as msg_err:
            print(f"Error receiving individual message: {msg_err}")
            break # Exit on error
    
    consumer.close()
    client_consumer.close()
    print("---
 Consumer finished. ---")
except Exception as e:
    print(f"Consumer error: {e}")

print("\nQuickstart demonstration complete.")

view raw JSON →