dbt-athena-community
dbt-athena-community is a community-maintained dbt adapter that enables dbt to connect and transform data in AWS Athena. It allows users to leverage dbt's data transformation capabilities by querying data directly from S3 using Athena's serverless engine. Currently at version 1.10.0, its release cycle generally aligns with major `dbt-core` versions, ensuring compatibility and leveraging new dbt features. It is a popular alternative to the official `dbt-athena` adapter.
Warnings
- breaking Adapter version must match `dbt-core` major version. `dbt-athena-community` versions are tightly coupled with `dbt-core`. A common footgun is upgrading `dbt-core` without upgrading the adapter.
- gotcha The `s3_staging_dir` in `profiles.yml` is a mandatory configuration. All queries executed by dbt-athena-community require this S3 path.
- gotcha Performance and cost considerations with `table` materialization on large datasets.
- gotcha Confusion between `dbt-athena-community` and the official `dbt-athena` adapter.
- gotcha Potential `boto3` and `pyathena` version conflicts with other libraries.
Install
-
pip install dbt-athena-community
Imports
- dbt CLI
Interact via dbt CLI commands like 'dbt run' after configuring profiles.yml
Quickstart
import os
import subprocess
import yaml
from pathlib import Path
import shutil
# Setup a temporary dbt profiles directory and project
temp_dbt_dir = Path("./temp_dbt_profiles")
temp_dbt_dir.mkdir(exist_ok=True)
profiles_path = temp_dbt_dir / "profiles.yml"
# Use environment variables for sensitive data or set placeholders
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", "YOUR_ACCESS_KEY") # For IAM user
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "YOUR_SECRET_KEY") # For IAM user
aws_session_token = os.environ.get("AWS_SESSION_TOKEN", "") # For temporary credentials
s3_staging_dir = os.environ.get("DBT_ATHENA_S3_STAGING_DIR", "s3://your-dbt-athena-bucket/staging/")
athena_workgroup = os.environ.get("DBT_ATHENA_WORKGROUP", "primary")
athena_database = os.environ.get("DBT_ATHENA_DATABASE", "dbt_athena_db")
aws_region = os.environ.get("AWS_REGION", "us-east-1")
profiles_content = {
"my_athena_project": { # This name must match 'profile' in dbt_project.yml
"target": "dev",
"outputs": {
"dev": {
"type": "athena",
"s3_staging_dir": s3_staging_dir,
"database": athena_database,
"schema": "dbt_schema",
"region_name": aws_region,
"work_group": athena_workgroup,
# Authentication: Use one of the following methods
"aws_profile_name": "default", # Uses ~/.aws/credentials profile
# OR directly provide credentials (less secure for production)
# "aws_access_key_id": aws_access_key_id,
# "aws_secret_access_key": aws_secret_access_key,
# "aws_session_token": aws_session_token, # Optional
# Other common optional settings
"poll_interval": 5, # Seconds between status checks
"num_retries": 10,
"threads": 4
}
}
}
}
with open(profiles_path, "w") as f:
yaml.dump(profiles_content, f, default_flow_style=False)
print(f"Profiles file created at: {profiles_path}")
# Create a minimal dbt project structure
project_dir = Path("./temp_dbt_project")
project_dir.mkdir(exist_ok=True)
(project_dir / "models").mkdir(exist_ok=True)
dbt_project_yml_content = f"""
name: 'my_athena_project'
version: '1.0.0'
config-version: 2
profile: 'my_athena_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
- "logs"
models:
my_athena_project:
+materialized: view
"""
with open(project_dir / "dbt_project.yml", "w") as f:
f.write(dbt_project_yml_content)
# Create a sample model
model_sql_content = """
-- models/my_first_model.sql
SELECT 1 AS id, 'hello from dbt-athena' AS message
"""
with open(project_dir / "models" / "my_first_model.sql", "w") as f:
f.write(model_sql_content)
print(f"dbt project created at: {project_dir}")
# Attempt to run dbt (this requires dbt-core to be installed in the environment)
try:
print("\nAttempting to run dbt...")
# Set DBT_PROFILES_DIR for the subprocess to use our temp profiles.yml
env_vars = os.environ.copy()
env_vars["DBT_PROFILES_DIR"] = str(temp_dbt_dir.resolve())
result = subprocess.run(
["dbt", "run", "--project-dir", str(project_dir.resolve())],
check=True,
capture_output=True,
text=True,
env=env_vars
)
print("dbt run successful!")
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"dbt run failed with exit code {e.returncode}: {e}")
print("Stdout:", e.stdout)
print("Stderr:", e.stderr)
except FileNotFoundError:
print("Error: 'dbt' command not found. Ensure dbt-core is installed (pip install dbt-core).")
finally:
# Clean up temporary files/directories
if temp_dbt_dir.exists():
shutil.rmtree(temp_dbt_dir)
print(f"Cleaned up {temp_dbt_dir}")
if project_dir.exists():
shutil.rmtree(project_dir)
print(f"Cleaned up {project_dir}")