Google Cloud Pub/Sub Client (Async)

6.3.0 · active · verified Sun Apr 12

Python Client for Google Cloud Pub/Sub using `aiohttp` for asynchronous operations. It is part of the `gcloud-aio` monorepo, which provides async clients for various Google Cloud services. The library is actively maintained with regular updates across its sub-packages.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to publish a message to a Pub/Sub topic and then subscribe to a subscription to receive and acknowledge a message using `gcloud-aio-pubsub`. It assumes you have `GCP_PROJECT` and `GCP_CREDENTIALS` environment variables set up, and that the specified topic and subscription already exist in your Google Cloud Project.

import asyncio
import json
import os

from aiohttp import ClientSession
from gcloud.aio.pubsub import Publisher, Subscriber

async def main():
    # Ensure GCP_PROJECT and GCP_CREDENTIALS are set as environment variables.
    # For local testing, GCP_CREDENTIALS should be the JSON key file content.
    # Example: export GCP_CREDENTIALS=$(cat /path/to/keyfile.json)
    project = os.environ.get('GCP_PROJECT', '')
    if not project:
        print("Error: GCP_PROJECT environment variable not set.")
        return

    topic_name = 'my-test-topic' # Replace with an existing topic
    subscription_name = 'my-test-subscription' # Replace with an existing subscription

    async with ClientSession() as session:
        # --- Publisher Example ---
        publisher = Publisher(session=session, project=project)
        data = {'message': 'Hello, gcloud-aio-pubsub!'}
        message_id = await publisher.publish(topic_name, [data])
        print(f"Published message to '{topic_name}' with ID: {message_id}")

        # --- Subscriber Example ---
        subscriber = Subscriber(session=session, project=project)
        print(f"Listening for messages on '{subscription_name}'...")
        try:
            # Fetch up to 1 message for this example
            async for message in subscriber.subscribe(subscription_name, max_messages=1):
                print(f"Received message: {message.data.decode('utf-8')}")
                await message.ack()
                print("Message acknowledged.")
                break # Exit after first message for this quickstart
        except Exception as e:
            print(f"Error during subscription: {e}")

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →