Google Cloud Pub/Sub Lite Client Library
Google Cloud Pub/Sub Lite is a specialized messaging service for high-volume, low-cost event ingestion and delivery with partitioned topics. This Python client library, currently at version 1.13.0, provides a programmatic interface to interact with Pub/Sub Lite topics, subscriptions, and administrative operations, receiving regular updates from Google.
Warnings
- gotcha Publish idempotence, introduced in v1.8.0, was disabled by default in v1.8.2. If you relied on automatic idempotence, you must now explicitly enable it via `PublisherClient.publish(..., id='your-id')` or `PublisherSettings(..., publish_idempotence=True)`.
- gotcha Older versions of `google-cloud-pubsublite` (prior to v1.13.0) might inadvertently install pre-release versions of dependencies, leading to potential instability or unexpected behavior.
- gotcha Client objects (`PublisherClient`, `SubscriberClient`, `AdminClient`) should always be explicitly closed using their `.close()` method to ensure all resources are released and pending operations (like publishing messages) are completed.
- gotcha The `google-cloud-pubsublite` library requires Python 3.8 or newer. Using older Python versions will result in installation failures or runtime errors.
Install
-
pip install google-cloud-pubsublite
Imports
- PublisherClient
from google.cloud.pubsublite.cloudpubsub import PublisherClient
- SubscriberClient
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
- AdminClient
from google.cloud.pubsublite.admin import AdminClient
- TopicPath
from google.cloud.pubsublite.types import TopicPath
- FlowControlSettings
from google.cloud.pubsublite.types import FlowControlSettings
Quickstart
import os
import time
from concurrent.futures import TimeoutError
from google.cloud.pubsublite.cloudpubsub import PublisherClient, SubscriberClient
from google.cloud.pubsublite.types import (
CloudZone,
TopicPath,
SubscriptionPath,
FlowControlSettings,
)
# Configuration: Ensure GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_ZONE are set
# or replace placeholders with your actual GCP Project ID and a Lite zone (e.g., 'us-central1-a').
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
zone = os.environ.get("GOOGLE_CLOUD_ZONE", "us-central1-a")
topic_id = "my-pubsublite-topic"
subscription_id = "my-pubsublite-subscription"
# Define topic and subscription paths
topic_path = TopicPath(project_id, CloudZone(zone), topic_id)
subscription_path = SubscriptionPath(project_id, CloudZone(zone), subscription_id)
if project_id == "your-gcp-project-id":
print("Please set the GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_ZONE environment variables.")
print("Alternatively, replace 'your-gcp-project-id' and 'us-central1-a' with your actual values.")
exit(1)
print(f"Using Topic: {topic_path}")
print(f"Using Subscription: {subscription_path}")
print("Ensure the topic and subscription already exist in your Pub/Sub Lite project.")
# --- Publisher: Send a message ---
print("\n--- Publishing messages ---")
publisher = PublisherClient()
message_data = b"Hello from Pub/Sub Lite Python client!"
try:
# publish returns a future; .result() waits for acknowledgement
future = publisher.publish(topic_path, message_data)
message_id = future.result(timeout=5)
print(f"Published message '{message_data.decode()}' with ID: {message_id}")
except TimeoutError:
print("Publishing timed out.")
except Exception as e:
print(f"Error publishing message: {e}")
finally:
publisher.close() # Important to close the client to flush messages
print("Publisher client closed.")
# Allow a moment for the message to propagate
time.sleep(2)
# --- Subscriber: Receive messages ---
print("\n--- Subscribing to messages ---")
subscriber = SubscriberClient()
# Configure flow control to limit outstanding messages and bytes
flow_control_settings = FlowControlSettings(
messages_outstanding=10, bytes_outstanding=10 * 1024 * 1024 # 10 MiB
)
received_messages = []
def callback(message):
payload = message.data.decode()
print(f"Received message: '{payload}' (ID: {message.message_id})")
received_messages.append(payload)
message.ack() # Acknowledge the message to allow it to be discarded
# The subscribe method returns a streaming_future that can be cancelled.
streaming_future = subscriber.subscribe(subscription_path, callback, flow_control_settings)
print(f"Listening for messages on {subscription_path} for 10 seconds...")
try:
# Wait for the future to complete (e.g., if cancelled) or timeout.
streaming_future.result(timeout=10)
except TimeoutError:
print("No more messages received within 10 seconds or subscription completed.")
except Exception as e:
print(f"Error during subscription: {e}")
finally:
streaming_future.cancel() # Stop the subscription stream
subscriber.close() # Important to close the client to release resources
print("Subscriber client closed.")
if received_messages:
print(f"\nSuccessfully processed {len(received_messages)} message(s).")
else:
print("\nNo messages were processed by the subscriber.")