Google Cloud Pub/Sub Client Library
The `google-cloud-pubsub` Python client library provides a fully-managed, real-time messaging service for Google Cloud Pub/Sub. It facilitates asynchronous communication, decoupling services that produce messages from those that consume them, offering 'at least once' delivery, low latency, and on-demand scalability. The library is actively maintained with frequent, often weekly, releases for bug fixes and minor features within the broader `google-cloud-python` monorepo.
Warnings
- breaking Versions of `google-cloud-pubsub` from `2.35.0` and higher require Python 3.9 or newer. If you are using Python 3.7 or 3.8, you must pin the library version to `google-cloud-pubsub==2.34.0` or earlier.
- gotcha Instantiating multiple `PublisherClient` or `SubscriberClient` instances unnecessarily can lead to resource inefficiencies. These clients handle connection pooling and caching internally.
- gotcha Incorrectly configuring subscriber acknowledgment deadlines or flow control (prefetch settings) can cause 'stuck subscribers', messages being redelivered repeatedly (poison pill effect), or excessive resource consumption.
- gotcha Pub/Sub guarantees at-least-once delivery, meaning a message might be delivered more than once in certain scenarios (e.g., subscriber restarts, ack deadline issues).
- gotcha Failing to capture and log the `message_id` returned by `publisher.publish().result()` can severely hinder debugging and traceability in production.
- gotcha When using the Pub/Sub emulator, incorrect `PUBSUB_EMULATOR_HOST` or `GOOGLE_CLOUD_PROJECT` environment variable settings can cause client libraries to connect to the production Pub/Sub service instead of the local emulator.
Install
-
pip install google-cloud-pubsub
Imports
- PublisherClient
from google.cloud import pubsub_v1 publisher = pubsub_v1.PublisherClient()
- SubscriberClient
from google.cloud import pubsub_v1 subscriber = pubsub_v1.SubscriberClient()
Quickstart
import os
import time
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
project_id = os.environ.get('GOOGLE_CLOUD_PROJECT') or os.environ.get('GCP_PROJECT') or 'your-gcp-project-id'
topic_id = 'my-topic-id'
subscription_id = 'my-subscription-id'
if not project_id or project_id == 'your-gcp-project-id':
raise ValueError("Please set the GOOGLE_CLOUD_PROJECT environment variable or replace 'your-gcp-project-id'.")
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
# Create topic if it doesn't exist
try:
publisher.get_topic(request={"topic": topic_path})
print(f"Topic {topic_path} already exists.")
except Exception:
print(f"Creating topic {topic_path}...")
publisher.create_topic(request={"name": topic_path})
print(f"Topic {topic_path} created.")
# Create subscription if it doesn't exist
try:
subscriber.get_subscription(request={"subscription": subscription_path})
print(f"Subscription {subscription_path} already exists.")
except Exception:
print(f"Creating subscription {subscription_path}...")
subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
print(f"Subscription {subscription_path} created.")
# --- Publisher ---
message_data = "Hello, Pub/Sub!"
print(f"Publishing message: '{message_data}' to {topic_path}")
future = publisher.publish(topic_path, message_data.encode('utf-8'))
message_id = future.result()
print(f"Published message with ID: {message_id}")
# --- Subscriber ---
def callback(message: pubsub_v1.subscriber.message.Message):
print(f"Received message: {message.data.decode('utf-8')}")
print(f"Acknowledging message: {message.message_id}")
message.ack()
print(f"Listening for messages on {subscription_path}...")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# `subscribe` is non-blocking, so we must keep the main thread from exiting to allow it to run.
streaming_pull_future.result(timeout=30) # Wait 30 seconds for messages
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except KeyboardInterrupt:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Finished listening for messages.")
# Clean up resources (optional)
# publisher.delete_topic(request={"topic": topic_path})
# subscriber.delete_subscription(request={"subscription": subscription_path})
# print(f"Topic {topic_id} and subscription {subscription_id} deleted.")