Prefect AWS Integration

0.7.7 · active · verified Thu Apr 09

Prefect AWS is an integration library for Prefect, a data workflow orchestration and observability platform. It provides battle-tested blocks, tasks, and infrastructure components for seamless interaction with various Amazon Web Services, including ECS, S3, Secrets Manager, Lambda, Batch, and Glue. The current version is 0.7.7 and it receives frequent updates, often aligning with the Prefect core library's release cycle. [1, 4, 24]

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create and save an `AwsCredentials` block, which is essential for authenticating with AWS services. It then shows how to use this block with `prefect-aws.s3` tasks to upload and download a file from an S3 bucket within a Prefect flow. Remember to replace placeholder values with actual AWS credentials and an existing S3 bucket name. For production, save credentials securely in Prefect Cloud/Server or via environment variables and IAM roles. [1, 4, 8, 13]

import os
from prefect import flow, task
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket, s3_upload, s3_download

# NOTE: For a real application, create and save the AwsCredentials block in the Prefect UI
# or programmatically (as shown below, but typically saved once).
# os.environ.get is used here to prevent hardcoding sensitive credentials.

@flow
def create_and_save_aws_credentials_block():
    """Example flow to create and save an AwsCredentials block."""
    aws_credentials = AwsCredentials(
        aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', 'YOUR_ACCESS_KEY_ID'),
        aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', 'YOUR_SECRET_ACCESS_KEY'),
        region_name=os.environ.get('AWS_REGION', 'us-east-1')
    )
    # Replace 'my-aws-credentials' with your desired block name
    aws_credentials.save('my-aws-credentials', overwrite=True)
    print("AwsCredentials block 'my-aws-credentials' saved.")

@task
async def upload_file_to_s3(bucket_name: str, key: str, content: str):
    aws_credentials_block = await AwsCredentials.load('my-aws-credentials')
    await s3_upload(
        bucket=bucket_name,
        key=key,
        text=content,
        aws_credentials=aws_credentials_block
    )
    print(f"Uploaded '{key}' to s3://{bucket_name}")

@task
async def download_file_from_s3(bucket_name: str, key: str) -> str:
    aws_credentials_block = await AwsCredentials.load('my-aws-credentials')
    downloaded_content = await s3_download(
        bucket=bucket_name,
        key=key,
        aws_credentials=aws_credentials_block
    )
    print(f"Downloaded '{key}' from s3://{bucket_name}")
    return downloaded_content

@flow
async def s3_interaction_flow(bucket_name: str = "your-test-bucket"): # Replace with a real bucket name
    test_key = "prefect-test-file.txt"
    test_content = "Hello from Prefect AWS!"

    # Make sure credentials block exists
    if not await AwsCredentials.exists('my-aws-credentials'):
        print("AwsCredentials block not found. Run create_and_save_aws_credentials_block() first.")
        return

    await upload_file_to_s3(bucket_name, test_key, test_content)
    downloaded = await download_file_from_s3(bucket_name, test_key)
    print(f"Content verification: {downloaded == test_content}")

if __name__ == "__main__":
    # First, ensure your AWS credentials block is saved (run this once or use Prefect UI)
    # os.environ['AWS_ACCESS_KEY_ID'] = '...' # Set actual credentials
    # os.environ['AWS_SECRET_ACCESS_KEY'] = '...' # Set actual credentials
    # os.environ['AWS_REGION'] = '...' # Set actual region
    create_and_save_aws_credentials_block()
    
    # Then run the S3 interaction flow (replace 'your-test-bucket' with an actual S3 bucket name)
    import asyncio
    asyncio.run(s3_interaction_flow(bucket_name='your-test-bucket'))

view raw JSON →