Prefect AWS Integration
Prefect AWS is an integration library for Prefect, a data workflow orchestration and observability platform. It provides battle-tested blocks, tasks, and infrastructure components for seamless interaction with various Amazon Web Services, including ECS, S3, Secrets Manager, Lambda, Batch, and Glue. The current version is 0.7.7 and it receives frequent updates, often aligning with the Prefect core library's release cycle. [1, 4, 24]
Warnings
- breaking Prefect 2 (which `prefect-aws` is built for) introduced significant architectural changes from Prefect 1. Migrating from Prefect 1.x to Prefect 2.x requires updating flow definitions, deployment patterns (Agents vs. Workers), and infrastructure configurations. This is not a direct upgrade path. [19, 29, 30]
- deprecated The `ECSTask` infrastructure block has been deprecated and is replaced by the ECS Worker. Users should migrate to the ECS Worker for enhanced functionality, scalability, and better performance. [23]
- gotcha Directly embedding AWS Access Key IDs and Secret Access Keys in flow code is insecure and not recommended for production. [1, 4, 12]
- gotcha `prefect-aws` requires Python >=3.10 as of version 0.7.0. Older Prefect 2.x versions supported Python >=3.7, but this has been updated. [PyPI metadata, 24]
- gotcha Dependency conflicts can arise when installing `prefect-aws` alongside other Prefect extras or other libraries that heavily rely on `boto3`. [25]
Install
-
pip install prefect-aws
Imports
- AwsCredentials
from prefect_aws import AwsCredentials
- S3Bucket
from prefect_aws.s3 import S3Bucket
- s3_download
from prefect_aws.s3 import s3_download
- SecretsManager
from prefect_aws import SecretsManager
Quickstart
import os
from prefect import flow, task
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket, s3_upload, s3_download
# NOTE: For a real application, create and save the AwsCredentials block in the Prefect UI
# or programmatically (as shown below, but typically saved once).
# os.environ.get is used here to prevent hardcoding sensitive credentials.
@flow
def create_and_save_aws_credentials_block():
"""Example flow to create and save an AwsCredentials block."""
aws_credentials = AwsCredentials(
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', 'YOUR_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', 'YOUR_SECRET_ACCESS_KEY'),
region_name=os.environ.get('AWS_REGION', 'us-east-1')
)
# Replace 'my-aws-credentials' with your desired block name
aws_credentials.save('my-aws-credentials', overwrite=True)
print("AwsCredentials block 'my-aws-credentials' saved.")
@task
async def upload_file_to_s3(bucket_name: str, key: str, content: str):
aws_credentials_block = await AwsCredentials.load('my-aws-credentials')
await s3_upload(
bucket=bucket_name,
key=key,
text=content,
aws_credentials=aws_credentials_block
)
print(f"Uploaded '{key}' to s3://{bucket_name}")
@task
async def download_file_from_s3(bucket_name: str, key: str) -> str:
aws_credentials_block = await AwsCredentials.load('my-aws-credentials')
downloaded_content = await s3_download(
bucket=bucket_name,
key=key,
aws_credentials=aws_credentials_block
)
print(f"Downloaded '{key}' from s3://{bucket_name}")
return downloaded_content
@flow
async def s3_interaction_flow(bucket_name: str = "your-test-bucket"): # Replace with a real bucket name
test_key = "prefect-test-file.txt"
test_content = "Hello from Prefect AWS!"
# Make sure credentials block exists
if not await AwsCredentials.exists('my-aws-credentials'):
print("AwsCredentials block not found. Run create_and_save_aws_credentials_block() first.")
return
await upload_file_to_s3(bucket_name, test_key, test_content)
downloaded = await download_file_from_s3(bucket_name, test_key)
print(f"Content verification: {downloaded == test_content}")
if __name__ == "__main__":
# First, ensure your AWS credentials block is saved (run this once or use Prefect UI)
# os.environ['AWS_ACCESS_KEY_ID'] = '...' # Set actual credentials
# os.environ['AWS_SECRET_ACCESS_KEY'] = '...' # Set actual credentials
# os.environ['AWS_REGION'] = '...' # Set actual region
create_and_save_aws_credentials_block()
# Then run the S3 interaction flow (replace 'your-test-bucket' with an actual S3 bucket name)
import asyncio
asyncio.run(s3_interaction_flow(bucket_name='your-test-bucket'))