{"id":4180,"library":"pulsar-client","title":"Pulsar Python Client","description":"The `pulsar-client` library provides a Python API for interacting with Apache Pulsar, a distributed messaging and streaming platform. It allows applications to produce and consume messages from Pulsar topics, supporting various messaging patterns and schema management. The current version is 3.10.0, with releases occurring periodically to keep pace with the main Pulsar project and ensure compatibility with new broker features.","status":"active","version":"3.10.0","language":"en","source_language":"en","source_url":"https://github.com/apache/pulsar-client-python","tags":["messaging","queue","apache","pulsar","client","streaming"],"install":[{"cmd":"pip install pulsar-client","lang":"bash","label":"Install stable version"}],"dependencies":[],"imports":[{"symbol":"Client","correct":"from pulsar import Client"},{"symbol":"Producer","correct":"from pulsar import Producer"},{"symbol":"Consumer","correct":"from pulsar import Consumer"},{"symbol":"MessageId","correct":"from pulsar import MessageId"},{"note":"Used for structured message types like Avro or JSON schemas.","symbol":"Schema","correct":"from pulsar import Schema"},{"note":"Defines how messages are delivered to consumers within a subscription.","symbol":"ConsumerType","correct":"from pulsar import ConsumerType"}],"quickstart":{"code":"import pulsar\nimport time\nimport os\n\n# --- Configuration ---\n# Ensure a Pulsar broker is running, e.g., locally via Docker:\n# docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:latest bin/pulsar standalone\nPULSAR_BROKER_URL = os.environ.get('PULSAR_BROKER_URL', 'pulsar://localhost:6650')\nTOPIC_NAME = 'persistent://public/default/my-python-topic-qs'\nSUBSCRIPTION_NAME = 'my-python-subscription-qs'\n\nprint(f\"Connecting to Pulsar at: {PULSAR_BROKER_URL}\")\n\n# --- Producer ---\ntry:\n    print(\"\\n--- Starting Producer ---\")\n    client_producer = pulsar.Client(PULSAR_BROKER_URL)\n    producer = client_producer.create_producer(TOPIC_NAME)\n\n    for i in range(3):\n        message_data = f\"hello-pulsar-message-{i}\".encode('utf-8')\n        producer.send(message_data)\n        print(f\"Producer sent: {message_data.decode()}\")\n        time.sleep(0.1) # Small delay\n\n    producer.close()\n    client_producer.close()\n    print(\"--- Producer finished. ---\")\nexcept Exception as e:\n    print(f\"Producer error: {e}\")\n\ntime.sleep(1) # Give a moment for messages to be available in the topic\n\n# --- Consumer ---\ntry:\n    print(\"\\n--- Starting Consumer ---\")\n    client_consumer = pulsar.Client(PULSAR_BROKER_URL)\n    consumer = client_consumer.subscribe(TOPIC_NAME, SUBSCRIPTION_NAME, consumer_type=pulsar.ConsumerType.Shared)\n\n    print(\"Consumer waiting for messages...\")\n    received_count = 0\n    # We expect 3 messages from the producer\n    while received_count < 3:\n        try:\n            msg = consumer.receive(timeout_millis=3000) # Wait up to 3 seconds\n            if msg:\n                print(f\"Consumer received: '{msg.data().decode('utf-8')}' (ID: {msg.message_id()})\")\n                consumer.acknowledge(msg)\n                received_count += 1\n            else:\n                print(\"Consumer timed out waiting for message. Retrying...\")\n        except Exception as msg_err:\n            print(f\"Error receiving individual message: {msg_err}\")\n            break # Exit on error\n    \n    consumer.close()\n    client_consumer.close()\n    print(\"---\n Consumer finished. ---\")\nexcept Exception as e:\n    print(f\"Consumer error: {e}\")\n\nprint(\"\\nQuickstart demonstration complete.\")","lang":"python","description":"This quickstart demonstrates how to produce and consume messages using the `pulsar-client` library. It assumes a Pulsar broker is running and accessible (e.g., via `pulsar://localhost:6650`). The script first acts as a producer, sending three messages to a topic, then switches to a consumer, subscribing to the same topic and receiving those messages. Crucially, it shows proper resource management by calling `.close()` on clients, producers, and consumers. The Pulsar broker URL can be configured via the `PULSAR_BROKER_URL` environment variable."},"warnings":[{"fix":"Refer to the official Apache Pulsar documentation for the recommended client-broker compatibility matrix. Upgrade your client and/or broker to compatible versions.","message":"Pulsar Python client versions are generally aligned with Apache Pulsar Broker versions. Using a client version (e.g., 3.x) with an older broker (e.g., 2.x) or vice-versa might lead to unexpected behavior, missing features, or connection failures. Always ensure compatibility between client and broker versions.","severity":"breaking","affected_versions":"All versions, especially major version bumps (e.g., 2.x to 3.x)"},{"fix":"Always ensure `.close()` is called on these objects when they are no longer needed, typically in `finally` blocks or by using `with` statements if available (though not directly supported by current `pulsar-client` objects in a context manager way for `close()`).","message":"Failing to explicitly call `.close()` on `pulsar.Client`, `Producer`, and `Consumer` objects can lead to resource leaks (e.g., open connections, file descriptors) and connection issues in long-running applications.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Carefully review the documentation for `pulsar.Client` constructor parameters like `authentication`, `tls_enable`, `tls_trust_certs_file`, `tls_allow_insecure_connection`, etc. Ensure paths to certificate files are correct and tokens are valid. Set `tls_enable=True` for TLS connections.","message":"Correctly configuring authentication (e.g., JWT tokens) and TLS for secure connections is a common source of initial setup errors. Incorrect parameters can lead to connection refused errors or authentication failures.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}