Prefect GCP
Prefect GCP is an integration library that allows Prefect workflows to interact seamlessly with various Google Cloud Platform (GCP) services. It provides blocks and tasks for services like Google Cloud Storage, BigQuery, Secret Manager, Vertex AI, and Cloud Run. The library is currently at version 0.6.17 and generally follows the release cadence of the main Prefect orchestration engine.
Warnings
- breaking The `CloudRunJob` and `VertexAICustomTrainingJob` infrastructure blocks have been deprecated in favor of Cloud Run and Vertex AI workers. These blocks will not be available after September 2024. Users should migrate to the new worker-based execution model for deploying flows to these services.
- gotcha Always register `prefect-gcp` blocks after installation using `prefect block register -m prefect_gcp` for them to be available in the Prefect UI and for `Block.load()` operations.
- gotcha Authentication with GCP services via `GcpCredentials` relies on Google's Application Default Credentials (ADC) by default if `service_account_file` or `service_account_info` are not provided. Ensure your environment (e.g., local machine, VM, Cloud Run service) has proper ADC configured or explicitly provide service account credentials. Missing or incorrect permissions on the service account are a common source of errors.
- gotcha There's a distinction between `prefect.filesystems.GCS` (from Prefect Core) and `prefect_gcp.cloud_storage.GcsBucket`. The latter, part of `prefect-gcp`, utilizes the `google-cloud-storage` package for more comprehensive functionality and is generally recommended for deeper GCP Cloud Storage integrations.
- gotcha For production deployments, it's highly recommended to pin specific versions of `prefect` and `prefect-gcp` (e.g., `prefecthq/prefect-gcp:0.6.17-python3.12-prefect3.6.19`) in your Docker images or `requirements.txt` to ensure stability and avoid unexpected behavior from automatic updates.
Install
-
pip install -U "prefect-gcp" -
pip install -U "prefect-gcp[all_extras]" -
prefect block register -m prefect_gcp
Imports
- GcpCredentials
from prefect_gcp import GcpCredentials
- GcsBucket
from prefect_gcp.cloud_storage import GcsBucket
- BigQueryWarehouse
from prefect_gcp.bigquery import BigQueryWarehouse
- GcpSecret
from prefect_gcp.secret_manager import GcpSecret
Quickstart
import os
import asyncio
from prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket
from dotenv import load_dotenv
# Load environment variables from .env file (if present)
load_dotenv()
@task
async def upload_and_download_file(bucket_name: str, file_content: str, source_path: str, destination_path: str):
"""Uploads a string to GCS and downloads it to a different path."""
# Ensure GcpCredentials block exists, or create one for testing
# In a real scenario, you'd load a pre-configured block: GcpCredentials.load("my-gcp-creds")
# For quickstart, we use env vars for service account info if available
service_account_info_str = os.environ.get('GCP_SERVICE_ACCOUNT_INFO')
service_account_info = None
if service_account_info_str:
import json
service_account_info = json.loads(service_account_info_str)
gcp_credentials = GcpCredentials(service_account_info=service_account_info)
await gcp_credentials.save('gcp-quickstart-creds', overwrite=True)
# Ensure GcsBucket block exists, or create one for testing
gcs_bucket_block = GcsBucket(bucket_name=bucket_name, gcp_credentials=gcp_credentials)
await gcs_bucket_block.save('gcs-quickstart-bucket', overwrite=True)
# Use the saved block in the flow
gcs_block = await GcsBucket.load("gcs-quickstart-bucket")
print(f"Uploading content to gs://{bucket_name}/{source_path}")
await gcs_block.upload_from_bytes(file_content.encode('utf-8'), source_path)
print("Upload complete.")
print(f"Downloading content from gs://{bucket_name}/{source_path} to {destination_path}")
downloaded_content = await gcs_block.read_bytes(source_path)
print(f"Downloaded content: {downloaded_content.decode('utf-8')}")
# Clean up the file (optional)
# await gcs_block.rm(source_path)
# print(f"Cleaned up gs://{bucket_name}/{source_path}")
@flow(log_prints=True)
async def gcp_storage_flow(bucket_name: str = "your-prefect-gcp-bucket-name"):
file_content = "Hello from Prefect GCP!"
source_path = "prefect-gcp-test.txt"
destination_path = "downloaded-prefect-gcp-test.txt"
await upload_and_download_file(bucket_name, file_content, source_path, destination_path)
if __name__ == "__main__":
# Set this environment variable or use actual service account info for testing
# export GCP_SERVICE_ACCOUNT_INFO='{"type": "service_account", ...}'
# Ensure you have a GCP bucket created and permissions for the service account
# To run:
# 1. pip install -U "prefect-gcp[cloud_storage]" python-dotenv
# 2. prefect block register -m prefect_gcp
# 3. Create a GCS bucket (e.g., 'my-prefect-gcp-bucket')
# 4. Set GCP_SERVICE_ACCOUNT_INFO env var with your service account JSON, or rely on ADC.
# 5. python your_script_name.py
asyncio.run(gcp_storage_flow(bucket_name=os.environ.get("GCS_BUCKET_NAME", "your-prefect-gcp-bucket-name")))