Google Cloud Pub/Sub Client Library
raw JSON → 2.36.0 verified Tue May 12 auth: no python install: verified quickstart: stale
The `google-cloud-pubsub` Python client library provides a fully-managed, real-time messaging service for Google Cloud Pub/Sub. It facilitates asynchronous communication, decoupling services that produce messages from those that consume them, offering 'at least once' delivery, low latency, and on-demand scalability. The library is actively maintained with frequent, often weekly, releases for bug fixes and minor features within the broader `google-cloud-python` monorepo.
pip install google-cloud-pubsub Common errors
error UNAUTHENTICATED / 403 (Forbidden) / PERMISSION_DENIED ↓
cause The client application lacks valid authentication credentials or the necessary IAM permissions to access Pub/Sub resources.
fix
Ensure the service account key is valid and correctly configured, the OAuth token is not expired, and the service account or user has the required Pub/Sub IAM roles (e.g.,
roles/pubsub.publisher, roles/pubsub.subscriber) for the specific topic or subscription in the correct project. error ModuleNotFoundError: No module named 'google' ↓
cause The `google-cloud-pubsub` library or its dependencies are not correctly installed or accessible in the Python environment, often seen in deployed environments like App Engine or Docker.
fix
Ensure
google-cloud-pubsub is listed in your requirements.txt file and installed (e.g., pip install google-cloud-pubsub). For deployment environments, verify that the pip install -r requirements.txt step is executed and packages are correctly bundled. error DEADLINE_EXCEEDED ↓
cause A publish or subscribe operation failed to complete within the allocated timeout duration, often due to client-side bottlenecks, network issues, or insufficient retry settings.
fix
Increase the timeout duration for publish operations in the publisher client's retry settings (e.g.,
total_timeout to 600 seconds). For subscribers, ensure adequate client resources (memory, CPU) and stable network connectivity, and let the client library handle deadline extensions automatically. error AttributeError: 'Client' object has no attribute 'pull' ↓
cause This error occurs when using an outdated `google-cloud-pubsub` client API, specifically attempting to call `pull` directly on a generic `Client` object, instead of the dedicated `SubscriberClient`.
fix
Update your code to use the
SubscriberClient for pull operations, which was introduced in newer versions of the library (typically after 0.27.0). Example: from google.cloud.pubsub_v1 import SubscriberClient and then subscriber = SubscriberClient(). Warnings
breaking Versions of `google-cloud-pubsub` from `2.35.0` and higher require Python 3.9 or newer. If you are using Python 3.7 or 3.8, you must pin the library version to `google-cloud-pubsub==2.34.0` or earlier. ↓
fix Upgrade your Python environment to 3.9+ or pin the library version: `pip install google-cloud-pubsub==2.34.0`.
gotcha Instantiating multiple `PublisherClient` or `SubscriberClient` instances unnecessarily can lead to resource inefficiencies. These clients handle connection pooling and caching internally. ↓
fix For most applications, create a single `PublisherClient` and a single `SubscriberClient` instance per process and reuse them across operations to optimize resource utilization.
gotcha Incorrectly configuring subscriber acknowledgment deadlines or flow control (prefetch settings) can cause 'stuck subscribers', messages being redelivered repeatedly (poison pill effect), or excessive resource consumption. ↓
fix Carefully tune your subscription's acknowledgment deadline to allow sufficient time for message processing. Implement appropriate flow control settings (`max_messages`, `max_bytes`) to prevent your application from being overwhelmed by messages. Always call `message.ack()` or `message.nack()` after processing.
gotcha Pub/Sub guarantees at-least-once delivery, meaning a message might be delivered more than once in certain scenarios (e.g., subscriber restarts, ack deadline issues). ↓
fix Design your subscriber logic to be *idempotent*. Your message processing should produce the same result whether it's executed once or multiple times for the same message. Utilize unique business keys (like a transaction ID) and check against a fast-access store to prevent duplicate processing.
gotcha Failing to capture and log the `message_id` returned by `publisher.publish().result()` can severely hinder debugging and traceability in production. ↓
fix Always store or log the `message_id` returned by the publish operation. This ID is the primary way to correlate a published message with its ingestion in Google Cloud Logs Explorer and track its lifecycle.
breaking The Google Cloud Project ID must be provided to the Pub/Sub client libraries to identify the project where resources reside. This can be done by setting the `GOOGLE_CLOUD_PROJECT` environment variable or by explicitly passing the `project` argument to client constructors. Failing to provide a valid project ID (or leaving it as a placeholder like 'your-gcp-project-id') will prevent client initialization. ↓
fix Ensure the `GOOGLE_CLOUD_PROJECT` environment variable is set to your actual Google Cloud Project ID (e.g., `my-project-123`) before initializing clients, or explicitly pass the project ID to the client constructor, for example: `PublisherClient(project='my-project-123')`.
Install compatibility verified last tested: 2026-05-12
python os / libc status wheel install import disk
3.10 alpine (musl) wheel - 2.27s 75.6M
3.10 alpine (musl) - - 2.06s 74.5M
3.10 slim (glibc) wheel 6.8s 1.23s 73M
3.10 slim (glibc) - - 1.17s 72M
3.11 alpine (musl) wheel - 2.62s 81.0M
3.11 alpine (musl) - - 3.07s 79.8M
3.11 slim (glibc) wheel 6.6s 1.76s 79M
3.11 slim (glibc) - - 1.75s 78M
3.12 alpine (musl) wheel - 2.69s 72.3M
3.12 alpine (musl) - - 3.17s 71.1M
3.12 slim (glibc) wheel 5.3s 2.07s 70M
3.12 slim (glibc) - - 2.41s 69M
3.13 alpine (musl) wheel - 2.56s 71.9M
3.13 alpine (musl) - - 4.04s 70.7M
3.13 slim (glibc) wheel 5.1s 1.98s 70M
3.13 slim (glibc) - - 2.59s 68M
3.9 alpine (musl) wheel - 2.06s 75.7M
3.9 alpine (musl) - - 1.86s 74.6M
3.9 slim (glibc) wheel 7.8s 1.49s 73M
3.9 slim (glibc) - - 1.32s 72M
Imports
- PublisherClient wrong
from google.cloud import pubsub publisher = pubsub.PublisherClient()correctfrom google.cloud import pubsub_v1 publisher = pubsub_v1.PublisherClient() - SubscriberClient wrong
from google.cloud import pubsub subscriber = pubsub.SubscriberClient()correctfrom google.cloud import pubsub_v1 subscriber = pubsub_v1.SubscriberClient()
Quickstart stale last tested: 2026-04-24
import os
import time
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
project_id = os.environ.get('GOOGLE_CLOUD_PROJECT') or os.environ.get('GCP_PROJECT') or 'your-gcp-project-id'
topic_id = 'my-topic-id'
subscription_id = 'my-subscription-id'
if not project_id or project_id == 'your-gcp-project-id':
raise ValueError("Please set the GOOGLE_CLOUD_PROJECT environment variable or replace 'your-gcp-project-id'.")
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
# Create topic if it doesn't exist
try:
publisher.get_topic(request={"topic": topic_path})
print(f"Topic {topic_path} already exists.")
except Exception:
print(f"Creating topic {topic_path}...")
publisher.create_topic(request={"name": topic_path})
print(f"Topic {topic_path} created.")
# Create subscription if it doesn't exist
try:
subscriber.get_subscription(request={"subscription": subscription_path})
print(f"Subscription {subscription_path} already exists.")
except Exception:
print(f"Creating subscription {subscription_path}...")
subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
print(f"Subscription {subscription_path} created.")
# --- Publisher ---
message_data = "Hello, Pub/Sub!"
print(f"Publishing message: '{message_data}' to {topic_path}")
future = publisher.publish(topic_path, message_data.encode('utf-8'))
message_id = future.result()
print(f"Published message with ID: {message_id}")
# --- Subscriber ---
def callback(message: pubsub_v1.subscriber.message.Message):
print(f"Received message: {message.data.decode('utf-8')}")
print(f"Acknowledging message: {message.message_id}")
message.ack()
print(f"Listening for messages on {subscription_path}...")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# `subscribe` is non-blocking, so we must keep the main thread from exiting to allow it to run.
streaming_pull_future.result(timeout=30) # Wait 30 seconds for messages
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except KeyboardInterrupt:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Finished listening for messages.")
# Clean up resources (optional)
# publisher.delete_topic(request={"topic": topic_path})
# subscriber.delete_subscription(request={"subscription": subscription_path})
# print(f"Topic {topic_id} and subscription {subscription_id} deleted.")