Prefect GCP

0.6.17 · active · verified Sun Apr 12

Prefect GCP is an integration library that allows Prefect workflows to interact seamlessly with various Google Cloud Platform (GCP) services. It provides blocks and tasks for services like Google Cloud Storage, BigQuery, Secret Manager, Vertex AI, and Cloud Run. The library is currently at version 0.6.17 and generally follows the release cadence of the main Prefect orchestration engine.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `GcpCredentials` block for authentication and the `GcsBucket` block to interact with Google Cloud Storage. It uploads a simple text string to a specified bucket and then downloads it. It's designed to be runnable by either using Application Default Credentials or by providing service account JSON via an environment variable.

import os
import asyncio
from prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket
from dotenv import load_dotenv

# Load environment variables from .env file (if present)
load_dotenv()

@task
async def upload_and_download_file(bucket_name: str, file_content: str, source_path: str, destination_path: str):
    """Uploads a string to GCS and downloads it to a different path."""
    # Ensure GcpCredentials block exists, or create one for testing
    # In a real scenario, you'd load a pre-configured block: GcpCredentials.load("my-gcp-creds")
    # For quickstart, we use env vars for service account info if available
    service_account_info_str = os.environ.get('GCP_SERVICE_ACCOUNT_INFO')
    service_account_info = None
    if service_account_info_str:
        import json
        service_account_info = json.loads(service_account_info_str)

    gcp_credentials = GcpCredentials(service_account_info=service_account_info)
    await gcp_credentials.save('gcp-quickstart-creds', overwrite=True)

    # Ensure GcsBucket block exists, or create one for testing
    gcs_bucket_block = GcsBucket(bucket_name=bucket_name, gcp_credentials=gcp_credentials)
    await gcs_bucket_block.save('gcs-quickstart-bucket', overwrite=True)

    # Use the saved block in the flow
    gcs_block = await GcsBucket.load("gcs-quickstart-bucket")

    print(f"Uploading content to gs://{bucket_name}/{source_path}")
    await gcs_block.upload_from_bytes(file_content.encode('utf-8'), source_path)
    print("Upload complete.")

    print(f"Downloading content from gs://{bucket_name}/{source_path} to {destination_path}")
    downloaded_content = await gcs_block.read_bytes(source_path)
    print(f"Downloaded content: {downloaded_content.decode('utf-8')}")

    # Clean up the file (optional)
    # await gcs_block.rm(source_path)
    # print(f"Cleaned up gs://{bucket_name}/{source_path}")

@flow(log_prints=True)
async def gcp_storage_flow(bucket_name: str = "your-prefect-gcp-bucket-name"):
    file_content = "Hello from Prefect GCP!"
    source_path = "prefect-gcp-test.txt"
    destination_path = "downloaded-prefect-gcp-test.txt"
    await upload_and_download_file(bucket_name, file_content, source_path, destination_path)

if __name__ == "__main__":
    # Set this environment variable or use actual service account info for testing
    # export GCP_SERVICE_ACCOUNT_INFO='{"type": "service_account", ...}'
    # Ensure you have a GCP bucket created and permissions for the service account
    
    # To run:
    # 1. pip install -U "prefect-gcp[cloud_storage]" python-dotenv
    # 2. prefect block register -m prefect_gcp
    # 3. Create a GCS bucket (e.g., 'my-prefect-gcp-bucket')
    # 4. Set GCP_SERVICE_ACCOUNT_INFO env var with your service account JSON, or rely on ADC.
    # 5. python your_script_name.py
    asyncio.run(gcp_storage_flow(bucket_name=os.environ.get("GCS_BUCKET_NAME", "your-prefect-gcp-bucket-name")))

view raw JSON →