Dagster GCP
Dagster-gcp is a Python library that provides components for interacting with Google Cloud Platform (GCP) services within the Dagster data orchestration framework. It includes resources and I/O managers for services like BigQuery, Google Cloud Storage (GCS), and Dataproc. The library is actively maintained with frequent releases, often in conjunction with the core Dagster framework, to ensure compatibility and introduce new features.
Warnings
- gotcha Dagster integration libraries, including `dagster-gcp`, follow a pre-1.0 versioning track (e.g., `0.x.y`) even though Dagster core is at `1.x.y`. While `0.16+` library releases are generally compatible with `Dagster 1.x`, their APIs are not as mature as core and 'Beta APIs may have breaking changes in minor version releases, with behavior changes in patch releases'.
- gotcha Proper GCP authentication is critical. `dagster-gcp` components rely on standard Google Cloud authentication mechanisms. Misconfiguration of `GOOGLE_APPLICATION_CREDENTIALS` environment variable or incorrect `gcp_credentials` in resource configuration is a common issue.
- gotcha When using `BigQueryIOManager` for assets or ops, if a dataset is not explicitly specified via `project` and `dataset` configuration, `key_prefix` on assets, or `schema` in op output metadata, it will default to 'public'. This might lead to data being written to an unintended or incorrect dataset.
- deprecated APIs marked as 'preview' or 'beta' (e.g., `PipesDataprocJobClient`) are not considered ready for production use and may introduce breaking changes in patch or minor releases.
Install
-
pip install dagster-gcp
Imports
- BigQueryResource
from dagster_gcp import BigQueryResource
- BigQueryIOManager
from dagster_gcp import BigQueryIOManager
- GCSResource
from dagster_gcp.gcs import GCSResource
- GCSPickleIOManager
from dagster_gcp.gcs import GCSPickleIOManager
- DataprocResource
from dagster_gcp.dataproc import DataprocResource
- PipesDataprocJobClient
from dagster_gcp.pipes import PipesDataprocJobClient
Quickstart
import os
from dagster import Definitions, asset, EnvVar
from dagster_gcp import BigQueryResource
# Ensure GOOGLE_APPLICATION_CREDENTIALS or similar env var is set for local execution
# For simplicity, project is hardcoded or read from an env var. In production, consider more robust auth.
@asset
def my_bq_table(bigquery: BigQueryResource):
"""An asset that queries a BigQuery table."""
project_id = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project')
dataset_id = os.environ.get('BIGQUERY_DATASET', 'my_dataset')
table_id = os.environ.get('BIGQUERY_TABLE', 'my_table')
# Example: Execute a simple query
query = f"SELECT COUNT(*) FROM `{project_id}.{dataset_id}.{table_id}`"
with bigquery.get_client() as client:
query_job = client.query(query)
results = query_job.result()
print(f"Query executed successfully. First row: {list(results)[0]}")
defs = Definitions(
assets=[my_bq_table],
resources={
"bigquery": BigQueryResource(
project=EnvVar("GCP_PROJECT_ID"), # Use EnvVar for production
location=EnvVar("GCP_REGION", default_value="us-central1"),
# You can also pass gcp_credentials as a base64 encoded JSON string via EnvVar
)
},
)
# To run this locally:
# 1. Set environment variables, e.g., GOOGLE_APPLICATION_CREDENTIALS, GCP_PROJECT_ID, BIGQUERY_DATASET, BIGQUERY_TABLE
# 2. Run `dagster dev -f your_file.py`
# 3. Navigate to Dagit UI, find 'my_bq_table' asset and materialize it.