Google Cloud Pub/Sub Lite Client Library

1.13.0 · active · verified Thu Apr 09

Google Cloud Pub/Sub Lite is a specialized messaging service for high-volume, low-cost event ingestion and delivery with partitioned topics. This Python client library, currently at version 1.13.0, provides a programmatic interface to interact with Pub/Sub Lite topics, subscriptions, and administrative operations, receiving regular updates from Google.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to publish a message to a Pub/Sub Lite topic and then subscribe to and consume that message from a subscription. It assumes you have already created the necessary Pub/Sub Lite topic and subscription within your GCP project and set the required environment variables (GOOGLE_CLOUD_PROJECT, GOOGLE_CLOUD_ZONE).

import os
import time
from concurrent.futures import TimeoutError

from google.cloud.pubsublite.cloudpubsub import PublisherClient, SubscriberClient
from google.cloud.pubsublite.types import (
    CloudZone,
    TopicPath,
    SubscriptionPath,
    FlowControlSettings,
)

# Configuration: Ensure GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_ZONE are set
# or replace placeholders with your actual GCP Project ID and a Lite zone (e.g., 'us-central1-a').
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
zone = os.environ.get("GOOGLE_CLOUD_ZONE", "us-central1-a")
topic_id = "my-pubsublite-topic"
subscription_id = "my-pubsublite-subscription"

# Define topic and subscription paths
topic_path = TopicPath(project_id, CloudZone(zone), topic_id)
subscription_path = SubscriptionPath(project_id, CloudZone(zone), subscription_id)

if project_id == "your-gcp-project-id":
    print("Please set the GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_ZONE environment variables.")
    print("Alternatively, replace 'your-gcp-project-id' and 'us-central1-a' with your actual values.")
    exit(1)

print(f"Using Topic: {topic_path}")
print(f"Using Subscription: {subscription_path}")
print("Ensure the topic and subscription already exist in your Pub/Sub Lite project.")

# --- Publisher: Send a message ---
print("\n--- Publishing messages ---")
publisher = PublisherClient()
message_data = b"Hello from Pub/Sub Lite Python client!"
try:
    # publish returns a future; .result() waits for acknowledgement
    future = publisher.publish(topic_path, message_data)
    message_id = future.result(timeout=5) 
    print(f"Published message '{message_data.decode()}' with ID: {message_id}")
except TimeoutError:
    print("Publishing timed out.")
except Exception as e:
    print(f"Error publishing message: {e}")
finally:
    publisher.close() # Important to close the client to flush messages
    print("Publisher client closed.")

# Allow a moment for the message to propagate
time.sleep(2)

# --- Subscriber: Receive messages ---
print("\n--- Subscribing to messages ---")
subscriber = SubscriberClient()
# Configure flow control to limit outstanding messages and bytes
flow_control_settings = FlowControlSettings(
    messages_outstanding=10, bytes_outstanding=10 * 1024 * 1024  # 10 MiB
)

received_messages = []

def callback(message):
    payload = message.data.decode()
    print(f"Received message: '{payload}' (ID: {message.message_id})")
    received_messages.append(payload)
    message.ack() # Acknowledge the message to allow it to be discarded

# The subscribe method returns a streaming_future that can be cancelled.
streaming_future = subscriber.subscribe(subscription_path, callback, flow_control_settings)
print(f"Listening for messages on {subscription_path} for 10 seconds...")

try:
    # Wait for the future to complete (e.g., if cancelled) or timeout.
    streaming_future.result(timeout=10) 
except TimeoutError:
    print("No more messages received within 10 seconds or subscription completed.")
except Exception as e:
    print(f"Error during subscription: {e}")
finally:
    streaming_future.cancel() # Stop the subscription stream
    subscriber.close() # Important to close the client to release resources
    print("Subscriber client closed.")

if received_messages:
    print(f"\nSuccessfully processed {len(received_messages)} message(s).")
else:
    print("\nNo messages were processed by the subscriber.")

view raw JSON →