Metaplane Airflow Provider
raw JSON → 0.0.6 verified Fri May 01 auth: no python
Metaplane Airflow Provider integrates Metaplane monitoring into Apache Airflow, enabling automatic data observability checks (field health, volume, freshness) as part of DAG runs. Current version 0.0.6 (alpha), with an unstable API and sparse documentation. Release cadence is irregular.
pip install airflow-metaplane Common errors
error ModuleNotFoundError: No module named 'airflow_metaplane' ↓
cause Attempted to import with incorrect module name (hyphen instead of underscore) or package not installed.
fix
Install package:
pip install airflow-metaplane. Then import: from airflow_metaplane.operators.metaplane import MetaplaneOperator. error TypeError: the JSON object must be str, bytes or bytearray, not list ↓
cause Passed a Python list to the `checks` parameter instead of a JSON string.
fix
Wrap the list in json.dumps or use a string literal:
checks='["field_health"]'. Warnings
breaking Operator parameters may change without notice. The operator currently accepts `metaplane_api_key`, `metaplane_api_secret`, `connection_id`, `checks`, `timeout`, and `retry`. No stable release yet. ↓
fix Pin to exact version and test after upgrades.
deprecated The `checks` parameter expects a JSON string (list) but some users pass a Python list. This is not backward compatible. ↓
fix Always pass a JSON string, e.g., `checks='["field_health", "row_count"]'`.
gotcha Package name is `airflow-metaplane` but imports use `airflow_metaplane` (underscore). Confusion with underscore/hyphen is common. ↓
fix Install `pip install airflow-metaplane`, import `from airflow_metaplane`.
Imports
- MetaplaneOperator wrong
from airflow.providers.metaplane import MetaplaneOperatorcorrectfrom airflow_metaplane.operators.metaplane import MetaplaneOperator
Quickstart
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow_metaplane.operators.metaplane import MetaplaneOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'metaplane_dag',
default_args=default_args,
description='A simple Metaplane DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
check = MetaplaneOperator(
task_id='metaplane_check',
metaplane_api_key=os.environ.get('METAPLANE_API_KEY', ''),
metaplane_api_secret=os.environ.get('METAPLANE_API_SECRET', ''),
connection_id=123,
checks='["field_health", "row_count"]',
)