Microsoft Fabric Provider for Apache Airflow
A Python package that helps Data and Analytics engineers trigger on-demand job items of Microsoft Fabric in Apache Airflow DAGs. It enables orchestration of various Fabric items like Notebooks, Pipelines, Spark job definitions, and Semantic Model refreshes. The current version is 0.0.9, and as a newly developed provider, it is expected to have frequent updates.
Warnings
- gotcha Authentication with Microsoft Fabric APIs requires a 'Generic' connection type in Airflow with specific fields: Client ID in 'Login', Refresh Token in 'Password', and Tenant ID, optional Client Secret, and Scopes in 'Extra'. This deviates from standard Airflow connection forms.
- breaking The provider is in early development (version 0.0.x), which implies a potential for frequent API changes, renames of modules, operators, and connection parameters without necessarily adhering to strict semantic versioning for minor changes.
- gotcha When running on Microsoft Fabric's hosted Apache Airflow, users may encounter difficulties with installing Python packages, debugging environment failures, and recovering from bad installs. The built-in requirements validator might not always detect dependency conflicts, leading to 'unhealthy' scheduler/triggerer states with unhelpful error messages.
- gotcha The Microsoft OAuth refresh tokens used for Service Principal authentication have expiration periods. The generic Airflow connection will need these tokens periodically refreshed to maintain connectivity and prevent DAG failures.
Install
-
pip install apache-airflow-providers-microsoft-fabric
Imports
- MSFabricRunItemOperator
from airflow.providers.microsoft.fabric.operators.run_item import MSFabricRunItemOperator
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.microsoft.fabric.operators.run_item import MSFabricRunItemOperator
import os
FABRIC_WORKSPACE_ID = os.environ.get('FABRIC_WORKSPACE_ID', 'your_workspace_id')
FABRIC_ITEM_ID = os.environ.get('FABRIC_ITEM_ID', 'your_item_id') # e.g., Notebook or Pipeline ID
FABRIC_CONN_ID = os.environ.get('FABRIC_CONN_ID', 'fabric_default') # Airflow Connection ID
with DAG(
dag_id="fabric_run_item_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
catchup=False,
schedule=None,
tags=["microsoft", "fabric", "etl"],
) as dag:
run_fabric_notebook = MSFabricRunItemOperator(
task_id="run_fabric_notebook_task",
workspace_id=FABRIC_WORKSPACE_ID,
item_id=FABRIC_ITEM_ID,
fabric_conn_id=FABRIC_CONN_ID,
job_type="RunNotebook", # or "Pipeline", "SparkJobDefinition", "SemanticModel" etc.
wait_for_termination=True,
timeout=60 * 60, # 1 hour timeout
)