Google Cloud Tasks Python Client
Cloud Tasks is a fully managed service for dispatching and delivering a large number of distributed tasks. The `google-cloud-tasks` Python client library, currently at version 2.22.0, provides an idiomatic way to interact with the Cloud Tasks API for creating, managing, and scheduling tasks. Google Cloud client libraries typically receive updates on an as-needed basis, often following changes in the underlying Google Cloud APIs.
Warnings
- gotcha Authentication is crucial. For local development, set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to point to a service account key file. For deployed applications, ensure the service account running your code has the `Cloud Tasks Enqueuer` role (or `cloudtasks.tasks.create` permission) for the queue, and if targeting authenticated endpoints (like Cloud Run), the task's OIDC token's service account must have permissions to invoke the target.
- gotcha Cloud Tasks aims for exactly-once delivery but can occasionally execute tasks more than once due to system trade-offs. Your task handlers should be idempotent to safely handle duplicate executions without unintended side effects.
- gotcha Overwhelming your target service or improperly configuring queue rate limits can lead to backlogs and HTTP 429 (Too Many Requests) or 503 (Service Unavailable) errors. Cloud Tasks will implement backoff, but sustained issues require intervention.
- gotcha When sending a payload with an HTTP task, the body must be explicitly encoded to bytes (e.g., `json.dumps(payload).encode('utf-8')`). Failing to encode will result in type errors.
- breaking The client library requires Python 3.9 or newer. Older Python versions (3.8 and below) are unsupported.
- gotcha Default logging for `google-cloud-tasks` (and other `google` prefixed libraries) does not propagate to the root logger by default. If you expect to see logs via your application's root logger, you must explicitly enable propagation.
Install
-
pip install google-cloud-tasks
Imports
- CloudTasksClient
from google.cloud import tasks_v2
- Task
from google.cloud.tasks_v2.types import Task
- HttpRequest
from google.cloud.tasks_v2.types import HttpRequest
- HttpMethod
from google.cloud.tasks_v2.types import HttpMethod
- OidcToken
from google.cloud.tasks_v2.types import OidcToken
- timestamp_pb2
from google.protobuf import timestamp_pb2
Quickstart
import os
import json
import datetime
from google.cloud import tasks_v2
from google.cloud.tasks_v2.types import Task, HttpRequest, HttpMethod
from google.protobuf import timestamp_pb2
def create_http_task_with_auth(
project_id: str,
location_id: str,
queue_id: str,
url: str,
service_account_email: str,
payload: dict = None,
schedule_time_seconds: int = None,
) -> Task:
"""Create an HTTP task with OIDC authentication.
Args:
project_id: The GCP project ID.
location_id: The location of the queue (e.g., 'us-central1').
queue_id: The ID of the Cloud Tasks queue.
url: The full URL of the HTTP target (e.g., a Cloud Run service).
service_account_email: The email of the service account for OIDC token.
payload: (Optional) Dictionary payload to send with the task.
schedule_time_seconds: (Optional) Delay in seconds before task execution.
Returns:
The created Task object.
"""
client = tasks_v2.CloudTasksClient()
# Construct the queue path.
parent = client.queue_path(project_id, location_id, queue_id)
# Construct the HTTP request for the task.
http_request = HttpRequest(
http_method=HttpMethod.POST,
url=url,
oidc_token={'service_account_email': service_account_email}
)
if payload:
http_request.headers['Content-Type'] = 'application/json'
http_request.body = json.dumps(payload).encode('utf-8')
# Construct the task.
task = Task(http_request=http_request)
if schedule_time_seconds:
# Schedule the task for a future time.
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=schedule_time_seconds)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(future_time)
task.schedule_time = timestamp
# Send the task creation request.
response = client.create_task(parent=parent, task=task)
print(f"Created task: {response.name}")
return response
# --- Example Usage ---
if __name__ == "__main__":
# Set environment variables for authentication (e.g., GOOGLE_APPLICATION_CREDENTIALS)
# and for the task details.
project = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project-id')
location = os.environ.get('GCP_LOCATION_ID', 'us-central1')
queue = os.environ.get('GCP_TASK_QUEUE_ID', 'my-http-queue')
target_url = os.environ.get('TASK_TARGET_URL', 'https://your-cloud-run-service-url.run.app/task-handler')
sa_email = os.environ.get('TASK_SERVICE_ACCOUNT_EMAIL', 'your-service-account@your-gcp-project-id.iam.gserviceaccount.com')
if project == 'your-gcp-project-id':
print("Please set GCP_PROJECT_ID, GCP_LOCATION_ID, GCP_TASK_QUEUE_ID, TASK_TARGET_URL, and TASK_SERVICE_ACCOUNT_EMAIL environment variables.")
else:
task_payload = {"message": "Hello from Cloud Tasks!", "source": "python-client"}
try:
created_task = create_http_task_with_auth(
project,
location,
queue,
target_url,
sa_email,
payload=task_payload,
schedule_time_seconds=60 # Schedule 1 minute in the future
)
print(f"Successfully scheduled task: {created_task.name}")
except Exception as e:
print(f"Error creating task: {e}")
print("Ensure Cloud Tasks API is enabled, queue exists, and authentication is set up.")