Google Cloud Pub/Sub Client (Async)
Python Client for Google Cloud Pub/Sub using `aiohttp` for asynchronous operations. It is part of the `gcloud-aio` monorepo, which provides async clients for various Google Cloud services. The library is actively maintained with regular updates across its sub-packages.
Warnings
- breaking As of `gcloud-aio-auth` version 5.4.4 (a dependency of `gcloud-aio-pubsub`), support for Python 3.9 has been dropped. While `gcloud-aio-pubsub`'s `requires_python` might still indicate `>=3.9`, its transitive dependency on `gcloud-aio-auth>=5.4.4` means Python 3.9 is no longer supported across the `gcloud-aio` ecosystem.
- gotcha All operations in `gcloud-aio-pubsub` are asynchronous. Ensure that all calls to `Publisher` and `Subscriber` methods are properly `await`ed, and that your code runs within an `async` context (e.g., using `asyncio.run()` or an `async` function).
- gotcha Authentication requires setting the `GCP_PROJECT` environment variable for your Google Cloud Project ID and `GCP_CREDENTIALS` with the full JSON string of your service account key. The client library does not automatically discover credentials from `gcloud auth login` or default service accounts without these variables for explicit `gcloud-aio` usage.
- gotcha Proper `aiohttp.ClientSession` lifecycle management is crucial. It's recommended to create a single `ClientSession` per application and pass it to all `gcloud-aio` clients. Ensure the session is closed when no longer needed, typically using an `async with` statement.
Install
-
pip install gcloud-aio-pubsub
Imports
- Publisher
from gcloud.aio.pubsub import Publisher
- Subscriber
from gcloud.aio.pubsub import Subscriber
Quickstart
import asyncio
import json
import os
from aiohttp import ClientSession
from gcloud.aio.pubsub import Publisher, Subscriber
async def main():
# Ensure GCP_PROJECT and GCP_CREDENTIALS are set as environment variables.
# For local testing, GCP_CREDENTIALS should be the JSON key file content.
# Example: export GCP_CREDENTIALS=$(cat /path/to/keyfile.json)
project = os.environ.get('GCP_PROJECT', '')
if not project:
print("Error: GCP_PROJECT environment variable not set.")
return
topic_name = 'my-test-topic' # Replace with an existing topic
subscription_name = 'my-test-subscription' # Replace with an existing subscription
async with ClientSession() as session:
# --- Publisher Example ---
publisher = Publisher(session=session, project=project)
data = {'message': 'Hello, gcloud-aio-pubsub!'}
message_id = await publisher.publish(topic_name, [data])
print(f"Published message to '{topic_name}' with ID: {message_id}")
# --- Subscriber Example ---
subscriber = Subscriber(session=session, project=project)
print(f"Listening for messages on '{subscription_name}'...")
try:
# Fetch up to 1 message for this example
async for message in subscriber.subscribe(subscription_name, max_messages=1):
print(f"Received message: {message.data.decode('utf-8')}")
await message.ack()
print("Message acknowledged.")
break # Exit after first message for this quickstart
except Exception as e:
print(f"Error during subscription: {e}")
if __name__ == '__main__':
asyncio.run(main())