Astronomer Cosmos
Astronomer Cosmos is an open-source Python library that seamlessly integrates dbt (data build tool) Core projects with Apache Airflow. It allows users to orchestrate dbt models as native Airflow DAGs and Task Groups, providing enhanced observability, retries, and test execution directly within the Airflow UI. The library is actively maintained with frequent releases, evolving to support new dbt features like Fusion and various execution modes for optimal performance.
Warnings
- breaking Cosmos v1.14.0 (alpha releases) drops support for Apache Airflow versions earlier than 2.9.
- gotcha Airflow's `DagBag` import timeout can occur with large dbt projects due to slow manifest parsing, especially when using `ExecutionMode.LOCAL` without a pre-generated `manifest.json`.
- gotcha Dependency conflicts between `dbt-core`/dbt adapters and Airflow packages are common.
- gotcha Cosmos's default dbt test behavior (`TestBehavior.AFTER_EACH`) runs tests immediately after each model, which differs from dbt Core's default of running all tests after all models have completed. This 'fail-fast' approach can sometimes lead to unexpected failures, especially for tests that depend on multiple upstream models.
- gotcha Intermittent `FileNotFoundError: [Errno 2] No such file or directory` errors for `/tmp` files can occur during dbt model run tasks, particularly with `ExecutionMode.LOCAL`.
- gotcha When incorporating dbt snapshots, ensure they are run in a separate `DbtTaskGroup` from dbt models to guarantee the correct execution order.
Install
-
pip install astronomer-cosmos -
pip install 'astronomer-cosmos[dbt-<adapter>]'
Imports
- DbtDag
from cosmos import DbtDag
- DbtTaskGroup
from cosmos import DbtTaskGroup
- ProjectConfig
from cosmos import ProjectConfig
- ProfileConfig
from cosmos import ProfileConfig
- ExecutionConfig
from cosmos import ExecutionConfig
- PostgresUserPasswordProfileMapping
from cosmos.profiles import PostgresUserPasswordProfileMapping
Quickstart
import os
from datetime import datetime
from pathlib import Path
from airflow.models.dag import DAG
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
# Define path to your dbt project relative to the DAG file
# For this example, assume a dbt project named 'jaffle_shop' exists in 'dags/dbt/'
# Example structure: dags/my_cosmos_dag.py, dags/dbt/jaffle_shop/dbt_project.yml
DEFAULT_DBT_PROJECT_PATH = Path(__file__).parent / "dbt" / "jaffle_shop"
# Configure your dbt profile to use an Airflow connection
# Replace 'your_airflow_postgres_conn_id' with an actual Airflow connection ID
# and adjust profile_args as needed for your database schema.
profile_config = ProfileConfig(
profile_name="default", # Corresponds to the profile name in your dbt_project.yml
target_name="dev", # Corresponds to the target name in your dbt_project.yml
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=os.environ.get('AIRFLOW_DB_CONN_ID', 'airflow_db'), # Airflow connection ID
profile_args={"schema": "public"}, # Example schema
),
)
# Configure dbt execution, e.g., to run dbt in a virtual environment.
# Ensure 'dbt_executable_path' points to your dbt virtual environment's executable.
# If dbt is installed in the same environment as Airflow, this might be omitted or set to 'dbt'.
execution_config = ExecutionConfig(
dbt_executable_path=os.environ.get('DBT_EXECUTABLE_PATH', '/opt/airflow/dbt_venv/bin/dbt'),
)
with DAG(
dag_id="cosmos_dbt_example_dag",
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["dbt", "cosmos", "example"],
default_args={
"retries": 2, # Recommended for dbt tasks
},
) as dag:
dbt_project = DbtDag(
project_config=ProjectConfig(
DEFAULT_DBT_PROJECT_PATH,
),
profile_config=profile_config,
execution_config=execution_config,
operator_args={
"install_deps": True, # Install dbt dependencies before run
},
)
# The dbt_project object automatically creates Airflow tasks for your dbt models,
# which can be observed in the Airflow UI with their lineage.
# You can add upstream/downstream Airflow tasks as needed.
# For example, a simple PythonOperator before or after the dbt_project.