MWAA Disaster Recovery Solution (mwaa-dr)

2.2.0 · active · verified Thu Apr 16

mwaa-dr is a Python library that provides a reusable framework for implementing disaster recovery solutions for Amazon Managed Workflows for Apache Airflow (MWAA). It simplifies the creation of Airflow DAGs for exporting and importing MWAA metadata, enabling backup and restore capabilities for critical Airflow components like variables, connections, and DAG run history. The library currently supports various MWAA versions, with the latest PyPI release being 2.2.0, and development is ongoing with updates to support newer Airflow versions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `mwaa-dr` to create a daily metadata backup DAG and a manually triggered restore DAG for an MWAA environment running Apache Airflow 2.10.x. It also shows how to create a cleanup DAG. Before running, ensure you have an S3 bucket configured for backups and the `DR_BACKUP_BUCKET` Airflow variable is set in your MWAA environment. The MWAA execution role must have appropriate S3 permissions.

import os
from airflow import DAG
from airflow.utils.dates import days_ago
from mwaa_dr.v_2_10.dr_factory import DRFactory_2_10

# Ensure DR_BACKUP_BUCKET Airflow Variable is set in your MWAA environment
# and MWAA execution role has read/write permissions on it.
# Example: DR_BACKUP_BUCKET = 'your-mwaa-backup-bucket'

# Initialize the DRFactory for your MWAA/Airflow version
# For local testing with aws-mwaa-local-runner, use storage_type='LOCAL_FS'
# and create a 'data' folder in your dags directory.
factory = DRFactory_2_10(
    dag_id='backup_metadata_example',
    path_prefix='data', # Relative path within the S3 bucket or local_fs
    storage_type='S3' # Or 'LOCAL_FS' for local development
)

# Create a backup DAG
backup_dag: DAG = factory.create_backup_dag(
    schedule_interval='@daily', # Example schedule
    start_date=days_ago(1)
)

# Create a restore DAG (typically disabled by default, meant for manual trigger)
restore_dag: DAG = factory.create_restore_dag(
    dag_id='restore_metadata_example',
    start_date=days_ago(1),
    is_paused_upon_creation=True # Recommended for restore DAGs
)

# Create a cleanup DAG (for emptying metadata tables before restore, use with caution)
cleanup_dag: DAG = factory.create_cleanup_dag(
    dag_id='cleanup_metadata_example',
    start_date=days_ago(1),
    is_paused_upon_creation=True # Recommended for cleanup DAGs
)

view raw JSON →