Dagster AWS
Dagster-aws provides a collection of integrations for common AWS services, enabling Dagster to orchestrate workloads involving S3, ECS, Lambda, EMR, and more. It offers resources, run launchers, and IO managers to seamlessly connect Dagster assets and operations with your AWS infrastructure. The current version is 0.28.22, and it typically releases monthly, in conjunction with major Dagster core updates.
Warnings
- gotcha Dagster library versions (e.g., `dagster-aws` 0.x.y) are tightly coupled to specific `dagster` core versions (e.g., 1.x.y). Always ensure you install compatible versions to avoid runtime errors; mismatching minor versions is a common source of issues. For `dagster-aws` 0.28.x, ensure `dagster` core is 1.12.x.
- gotcha All interactions with AWS services (S3, ECS, Lambda, EMR, etc.) require correct AWS credentials and IAM permissions. Ensure the underlying compute environment (e.g., EC2 instance, ECS task, EKS pod) has an appropriate IAM role attached, or explicitly configure credentials via environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) or `~/.aws/credentials` for local development. Incorrect permissions lead to `AccessDenied` or `NoCredentialsError` exceptions.
- gotcha AWS resources like `S3Resource` or `EMRResource` may infer the AWS region from environment variables (`AWS_REGION`) or your `~/.aws/config` file. If running across multiple regions or in non-standard environments (e.g., localstack), always explicitly configure the `region_name` parameter to avoid unexpected cross-region errors or latency.
Install
-
pip install dagster-aws
Imports
- S3Resource
from dagster_aws.s3 import S3Resource
- s3_io_manager
from dagster_aws.s3.io_manager import s3_io_manager
- EcsRunLauncher
from dagster_aws.ecs import EcsRunLauncher
- LambdaRunLauncher
from dagster_aws.lambda_libs import LambdaRunLauncher
- emr_resource
from dagster_aws.emr import emr_resource
Quickstart
import os
from dagster import Definitions, asset, Config
from dagster_aws.s3 import S3Resource
class MyS3Config(Config):
bucket: str
key: str
@asset
def my_s3_asset(context, s3: S3Resource, config: MyS3Config):
"""
Writes a simple string to an S3 object.
"""
s3.get_client().put_object(
Bucket=config.bucket,
Key=config.key,
Body="Hello from Dagster S3!"
)
context.log.info(f"Wrote to s3://{config.bucket}/{config.key}")
defs = Definitions(
assets=[my_s3_asset],
resources={
"s3": S3Resource(
region_name=os.environ.get("AWS_REGION", "us-east-1"),
# For local testing, ensure these are set as env vars or use other AWS auth methods
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", ""),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "")
)
}
)
# To run:
# 1. Ensure AWS credentials and AWS_REGION are set in your environment variables.
# 2. dagster dev -f your_file.py
# 3. In the UI, launch a run for 'my_s3_asset' with a config like:
# {"ops": {"my_s3_asset": {"inputs": {"config": {"bucket": "your-bucket-name", "key": "my-dagster-object.txt"}}}}}}