Google Cloud Tasks Python Client

2.22.0 · active · verified Sat Mar 28

Cloud Tasks is a fully managed service for dispatching and delivering a large number of distributed tasks. The `google-cloud-tasks` Python client library, currently at version 2.22.0, provides an idiomatic way to interact with the Cloud Tasks API for creating, managing, and scheduling tasks. Google Cloud client libraries typically receive updates on an as-needed basis, often following changes in the underlying Google Cloud APIs.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create an HTTP task with OIDC authentication, suitable for targeting services like Cloud Run or Cloud Functions that require identity verification. It includes setting a payload and an optional schedule time. Ensure your Google Cloud environment is configured for Application Default Credentials (ADC) or explicitly set `GOOGLE_APPLICATION_CREDENTIALS` for local development.

import os
import json
import datetime
from google.cloud import tasks_v2
from google.cloud.tasks_v2.types import Task, HttpRequest, HttpMethod
from google.protobuf import timestamp_pb2

def create_http_task_with_auth(
    project_id: str,
    location_id: str,
    queue_id: str,
    url: str,
    service_account_email: str,
    payload: dict = None,
    schedule_time_seconds: int = None,
) -> Task:
    """Create an HTTP task with OIDC authentication.

    Args:
        project_id: The GCP project ID.
        location_id: The location of the queue (e.g., 'us-central1').
        queue_id: The ID of the Cloud Tasks queue.
        url: The full URL of the HTTP target (e.g., a Cloud Run service).
        service_account_email: The email of the service account for OIDC token.
        payload: (Optional) Dictionary payload to send with the task.
        schedule_time_seconds: (Optional) Delay in seconds before task execution.

    Returns:
        The created Task object.
    """
    client = tasks_v2.CloudTasksClient()

    # Construct the queue path.
    parent = client.queue_path(project_id, location_id, queue_id)

    # Construct the HTTP request for the task.
    http_request = HttpRequest(
        http_method=HttpMethod.POST,
        url=url,
        oidc_token={'service_account_email': service_account_email}
    )

    if payload:
        http_request.headers['Content-Type'] = 'application/json'
        http_request.body = json.dumps(payload).encode('utf-8')

    # Construct the task.
    task = Task(http_request=http_request)

    if schedule_time_seconds:
        # Schedule the task for a future time.
        future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=schedule_time_seconds)
        timestamp = timestamp_pb2.Timestamp()
        timestamp.FromDatetime(future_time)
        task.schedule_time = timestamp

    # Send the task creation request.
    response = client.create_task(parent=parent, task=task)
    print(f"Created task: {response.name}")
    return response

# --- Example Usage ---
if __name__ == "__main__":
    # Set environment variables for authentication (e.g., GOOGLE_APPLICATION_CREDENTIALS)
    # and for the task details.
    project = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project-id')
    location = os.environ.get('GCP_LOCATION_ID', 'us-central1')
    queue = os.environ.get('GCP_TASK_QUEUE_ID', 'my-http-queue')
    target_url = os.environ.get('TASK_TARGET_URL', 'https://your-cloud-run-service-url.run.app/task-handler')
    sa_email = os.environ.get('TASK_SERVICE_ACCOUNT_EMAIL', 'your-service-account@your-gcp-project-id.iam.gserviceaccount.com')

    if project == 'your-gcp-project-id':
        print("Please set GCP_PROJECT_ID, GCP_LOCATION_ID, GCP_TASK_QUEUE_ID, TASK_TARGET_URL, and TASK_SERVICE_ACCOUNT_EMAIL environment variables.")
    else:
        task_payload = {"message": "Hello from Cloud Tasks!", "source": "python-client"}
        try:
            created_task = create_http_task_with_auth(
                project,
                location,
                queue,
                target_url,
                sa_email,
                payload=task_payload,
                schedule_time_seconds=60 # Schedule 1 minute in the future
            )
            print(f"Successfully scheduled task: {created_task.name}")
        except Exception as e:
            print(f"Error creating task: {e}")
            print("Ensure Cloud Tasks API is enabled, queue exists, and authentication is set up.")

view raw JSON →