Google Cloud Task Queue Client (Asyncio)
An asynchronous Python client for interacting with Google Cloud Task Queue. This library is part of the `gcloud-aio` ecosystem, providing idiomatic `asyncio` support for various Google Cloud services. As an independent sub-package, its releases are decoupled from other `gcloud-aio` components, and it's currently at version 7.0.0.
Warnings
- breaking The `add_task` method now returns a `Task` object instead of `None`.
- breaking The `payload` parameter for `add_task` was removed and replaced with `body`.
- breaking The `method` parameter for `add_task` now requires an `HttpMethod` enum member instead of a string.
- breaking The `Client` class was renamed to `TaskQueueClient` for better clarity and consistency.
- gotcha Always manage `gcloud_aio_core.session.Session` or `aiohttp.ClientSession` as an async context manager (`async with`).
Install
-
pip install gcloud-aio-taskqueue
Imports
- TaskQueueClient
from gcloud_aio_taskqueue import Client
from gcloud_aio_taskqueue import TaskQueueClient
- Session
from gcloud_aio_core.session import Session
- build_credentials
from gcloud_aio_core.auth import Authenticator
from gcloud_aio_core.auth import build_credentials
- HttpMethod
from gcloud_aio_taskqueue.types import HttpMethod
Quickstart
import asyncio
import os
from gcloud_aio_taskqueue import TaskQueueClient
from gcloud_aio_taskqueue.types import HttpMethod
from gcloud_aio_core.session import Session
from gcloud_aio_core.auth import build_credentials
async def main():
project_id = os.environ.get("GCP_PROJECT_ID", "your-project-id")
queue_name = os.environ.get("GCP_TASK_QUEUE_NAME", "your-queue-name")
# Authenticate using GOOGLE_APPLICATION_CREDENTIALS env var or a service account path
# Example: GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json
credentials = build_credentials()
async with Session(credentials) as session:
client = TaskQueueClient(
project=project_id,
queue=queue_name,
session=session,
)
try:
# Add a task with required parameters (changes from v6.0.0 onwards)
task = await client.add_task(
url="https://example.com/mytaskendpoint",
body=b"Hello World!",
payload_type="APPLICATION_OCTET_STREAM",
method=HttpMethod.POST,
headers={
"Content-Type": "application/octet-stream",
"X-Cloud-Task-ETA": "2025-01-01T00:00:00Z" # Example custom header
}
)
print(f"Task added: {task.name} (ID: {task.id})")
# Example: Get a task
retrieved_task = await client.get_task(task.name)
print(f"Retrieved task: {retrieved_task.name}")
# Example: Delete a task
await client.delete_task(task.name)
print(f"Task {task.name} deleted.")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
asyncio.run(main())