Astronomer Cosmos

1.13.1 · active · verified Sun Mar 29

Astronomer Cosmos is an open-source Python library that seamlessly integrates dbt (data build tool) Core projects with Apache Airflow. It allows users to orchestrate dbt models as native Airflow DAGs and Task Groups, providing enhanced observability, retries, and test execution directly within the Airflow UI. The library is actively maintained with frequent releases, evolving to support new dbt features like Fusion and various execution modes for optimal performance.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create an Airflow DAG that orchestrates a dbt project using Astronomer Cosmos's `DbtDag` class. It configures a dbt project path, maps an Airflow connection to a dbt profile, and sets an execution environment for dbt. Replace placeholder values like `AIRFLOW_DB_CONN_ID` and `DBT_EXECUTABLE_PATH` with your actual environment variables or paths. Ensure your dbt project (e.g., 'jaffle_shop') is accessible from your Airflow environment.

import os
from datetime import datetime
from pathlib import Path

from airflow.models.dag import DAG
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

# Define path to your dbt project relative to the DAG file
# For this example, assume a dbt project named 'jaffle_shop' exists in 'dags/dbt/'
# Example structure: dags/my_cosmos_dag.py, dags/dbt/jaffle_shop/dbt_project.yml
DEFAULT_DBT_PROJECT_PATH = Path(__file__).parent / "dbt" / "jaffle_shop"

# Configure your dbt profile to use an Airflow connection
# Replace 'your_airflow_postgres_conn_id' with an actual Airflow connection ID
# and adjust profile_args as needed for your database schema.
profile_config = ProfileConfig(
    profile_name="default", # Corresponds to the profile name in your dbt_project.yml
    target_name="dev",      # Corresponds to the target name in your dbt_project.yml
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id=os.environ.get('AIRFLOW_DB_CONN_ID', 'airflow_db'), # Airflow connection ID
        profile_args={"schema": "public"}, # Example schema
    ),
)

# Configure dbt execution, e.g., to run dbt in a virtual environment.
# Ensure 'dbt_executable_path' points to your dbt virtual environment's executable.
# If dbt is installed in the same environment as Airflow, this might be omitted or set to 'dbt'.
execution_config = ExecutionConfig(
    dbt_executable_path=os.environ.get('DBT_EXECUTABLE_PATH', '/opt/airflow/dbt_venv/bin/dbt'),
)

with DAG(
    dag_id="cosmos_dbt_example_dag",
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=["dbt", "cosmos", "example"],
    default_args={
        "retries": 2, # Recommended for dbt tasks
    },
) as dag:
    dbt_project = DbtDag(
        project_config=ProjectConfig(
            DEFAULT_DBT_PROJECT_PATH,
        ),
        profile_config=profile_config,
        execution_config=execution_config,
        operator_args={
            "install_deps": True, # Install dbt dependencies before run
        },
    )

    # The dbt_project object automatically creates Airflow tasks for your dbt models,
    # which can be observed in the Airflow UI with their lineage.
    # You can add upstream/downstream Airflow tasks as needed.
    # For example, a simple PythonOperator before or after the dbt_project.

view raw JSON →