dbt-athena-community

1.10.0 · active · verified Sun Apr 12

dbt-athena-community is a community-maintained dbt adapter that enables dbt to connect and transform data in AWS Athena. It allows users to leverage dbt's data transformation capabilities by querying data directly from S3 using Athena's serverless engine. Currently at version 1.10.0, its release cycle generally aligns with major `dbt-core` versions, ensuring compatibility and leveraging new dbt features. It is a popular alternative to the official `dbt-athena` adapter.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to configure `dbt-athena-community` using a `profiles.yml` file and run a sample dbt model. It creates temporary `profiles.yml` and `dbt_project.yml` files and then executes a `dbt run` command via `subprocess`. You need to replace placeholder AWS credentials/S3 path or ensure your environment has `~/.aws/credentials` configured. Ensure `dbt-core` is installed in your environment for the `dbt` CLI command to be found.

import os
import subprocess
import yaml
from pathlib import Path
import shutil

# Setup a temporary dbt profiles directory and project
temp_dbt_dir = Path("./temp_dbt_profiles")
temp_dbt_dir.mkdir(exist_ok=True)
profiles_path = temp_dbt_dir / "profiles.yml"

# Use environment variables for sensitive data or set placeholders
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", "YOUR_ACCESS_KEY") # For IAM user
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "YOUR_SECRET_KEY") # For IAM user
aws_session_token = os.environ.get("AWS_SESSION_TOKEN", "") # For temporary credentials
s3_staging_dir = os.environ.get("DBT_ATHENA_S3_STAGING_DIR", "s3://your-dbt-athena-bucket/staging/")
athena_workgroup = os.environ.get("DBT_ATHENA_WORKGROUP", "primary")
athena_database = os.environ.get("DBT_ATHENA_DATABASE", "dbt_athena_db")
aws_region = os.environ.get("AWS_REGION", "us-east-1")

profiles_content = {
    "my_athena_project": { # This name must match 'profile' in dbt_project.yml
        "target": "dev",
        "outputs": {
            "dev": {
                "type": "athena",
                "s3_staging_dir": s3_staging_dir,
                "database": athena_database,
                "schema": "dbt_schema",
                "region_name": aws_region,
                "work_group": athena_workgroup,
                # Authentication: Use one of the following methods
                "aws_profile_name": "default", # Uses ~/.aws/credentials profile
                # OR directly provide credentials (less secure for production)
                # "aws_access_key_id": aws_access_key_id,
                # "aws_secret_access_key": aws_secret_access_key,
                # "aws_session_token": aws_session_token, # Optional
                # Other common optional settings
                "poll_interval": 5, # Seconds between status checks
                "num_retries": 10,
                "threads": 4
            }
        }
    }
}

with open(profiles_path, "w") as f:
    yaml.dump(profiles_content, f, default_flow_style=False)

print(f"Profiles file created at: {profiles_path}")

# Create a minimal dbt project structure
project_dir = Path("./temp_dbt_project")
project_dir.mkdir(exist_ok=True)
(project_dir / "models").mkdir(exist_ok=True)

dbt_project_yml_content = f"""
name: 'my_athena_project'
version: '1.0.0'
config-version: 2

profile: 'my_athena_project'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"
  - "logs"

models:
  my_athena_project:
    +materialized: view
"""
with open(project_dir / "dbt_project.yml", "w") as f:
    f.write(dbt_project_yml_content)

# Create a sample model
model_sql_content = """
-- models/my_first_model.sql
SELECT 1 AS id, 'hello from dbt-athena' AS message
"""
with open(project_dir / "models" / "my_first_model.sql", "w") as f:
    f.write(model_sql_content)

print(f"dbt project created at: {project_dir}")

# Attempt to run dbt (this requires dbt-core to be installed in the environment)
try:
    print("\nAttempting to run dbt...")
    # Set DBT_PROFILES_DIR for the subprocess to use our temp profiles.yml
    env_vars = os.environ.copy()
    env_vars["DBT_PROFILES_DIR"] = str(temp_dbt_dir.resolve())
    
    result = subprocess.run(
        ["dbt", "run", "--project-dir", str(project_dir.resolve())],
        check=True,
        capture_output=True,
        text=True,
        env=env_vars
    )
    print("dbt run successful!")
    print(result.stdout)
except subprocess.CalledProcessError as e:
    print(f"dbt run failed with exit code {e.returncode}: {e}")
    print("Stdout:", e.stdout)
    print("Stderr:", e.stderr)
except FileNotFoundError:
    print("Error: 'dbt' command not found. Ensure dbt-core is installed (pip install dbt-core).")
finally:
    # Clean up temporary files/directories
    if temp_dbt_dir.exists():
        shutil.rmtree(temp_dbt_dir)
        print(f"Cleaned up {temp_dbt_dir}")
    if project_dir.exists():
        shutil.rmtree(project_dir)
        print(f"Cleaned up {project_dir}")

view raw JSON →