Google Cloud Pub/Sub Client Library

2.36.0 · active · verified Sat Mar 28

The `google-cloud-pubsub` Python client library provides a fully-managed, real-time messaging service for Google Cloud Pub/Sub. It facilitates asynchronous communication, decoupling services that produce messages from those that consume them, offering 'at least once' delivery, low latency, and on-demand scalability. The library is actively maintained with frequent, often weekly, releases for bug fixes and minor features within the broader `google-cloud-python` monorepo.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to publish a message to a Google Cloud Pub/Sub topic and then subscribe to and consume that message from a subscription. It handles topic and subscription creation if they don't exist and uses environment variables for project configuration.

import os
import time
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

project_id = os.environ.get('GOOGLE_CLOUD_PROJECT') or os.environ.get('GCP_PROJECT') or 'your-gcp-project-id'
topic_id = 'my-topic-id'
subscription_id = 'my-subscription-id'

if not project_id or project_id == 'your-gcp-project-id':
    raise ValueError("Please set the GOOGLE_CLOUD_PROJECT environment variable or replace 'your-gcp-project-id'.")

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Create topic if it doesn't exist
try:
    publisher.get_topic(request={"topic": topic_path})
    print(f"Topic {topic_path} already exists.")
except Exception:
    print(f"Creating topic {topic_path}...")
    publisher.create_topic(request={"name": topic_path})
    print(f"Topic {topic_path} created.")

# Create subscription if it doesn't exist
try:
    subscriber.get_subscription(request={"subscription": subscription_path})
    print(f"Subscription {subscription_path} already exists.")
except Exception:
    print(f"Creating subscription {subscription_path}...")
    subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
    print(f"Subscription {subscription_path} created.")


# --- Publisher --- 
message_data = "Hello, Pub/Sub!"
print(f"Publishing message: '{message_data}' to {topic_path}")
future = publisher.publish(topic_path, message_data.encode('utf-8'))
message_id = future.result()
print(f"Published message with ID: {message_id}")

# --- Subscriber --- 
def callback(message: pubsub_v1.subscriber.message.Message):
    print(f"Received message: {message.data.decode('utf-8')}")
    print(f"Acknowledging message: {message.message_id}")
    message.ack()

print(f"Listening for messages on {subscription_path}...")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # `subscribe` is non-blocking, so we must keep the main thread from exiting to allow it to run.
        streaming_pull_future.result(timeout=30) # Wait 30 seconds for messages
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except KeyboardInterrupt:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

print("Finished listening for messages.")

# Clean up resources (optional)
# publisher.delete_topic(request={"topic": topic_path})
# subscriber.delete_subscription(request={"subscription": subscription_path})
# print(f"Topic {topic_id} and subscription {subscription_id} deleted.")

view raw JSON →