Airbyte Apache Airflow Provider
The `apache-airflow-providers-airbyte` package provides Apache Airflow operators and sensors to interact with Airbyte, an open-source data integration platform. It enables users to trigger and monitor Airbyte synchronization jobs directly from Airflow DAGs. The current version is 5.4.0, supporting Airflow >=2.11.0 and Python >=3.10, and it maintains a regular release cadence with ongoing development.
Warnings
- breaking Authentication mechanism for Airbyte connections changed in provider version 4.0.0. It now uses `client_id` and `client_secret` instead of `login` and `password`. The `host` parameter for the Airflow Airbyte connection must be a Fully Qualified Domain Name (FQDN) including schema (e.g., `https://my.company:8000/airbyte/v1/`). The `api_type` parameter was also removed.
- breaking The minimum required Apache Airflow version has increased over time. For provider version 5.4.0, Airflow 2.11.0+ is required. Additionally, provider version 5.4.0 removed the `polling_interval` parameter from `AirbyteJobSensor`, favoring `poke_interval`.
- breaking Provider version 2.0.0 (and subsequently 2.1.0+) required Airflow 2.1.0+ due to the removal of the `apply_default` decorator. If upgrading the provider on an older Airflow, this could lead to automatic Airflow package upgrades and require a `airflow upgrade db` command.
- gotcha The `AirbyteTriggerSyncOperator` is not idempotent by design. Re-triggering the operator may initiate a new sync job in Airbyte, depending on Airbyte's configuration for the connection. Users should be aware of the Airbyte source/destination sync mode.
- gotcha The Airbyte operator in this provider is primarily designed to work with Airbyte self-managed instances (using its internal Config API). For orchestrating Airbyte Cloud, it's generally recommended to use Airflow's generic HTTP operators to interact with the newer Airbyte API directly.
- gotcha To prevent conflicts and ensure Airflow maintains control, it's highly recommended to set the replication frequency for Airbyte connections triggered by Airflow to 'Manual' within the Airbyte UI.
Install
-
pip install apache-airflow-providers-airbyte -
pip install apache-airflow-providers-airbyte[http]
Imports
- AirbyteTriggerSyncOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
- AirbyteJobSensor
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
- AirbyteHook
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
Quickstart
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.utils.dates import days_ago
AIRBYTE_CONNECTION_ID = os.environ.get('AIRBYTE_CONN_ID', 'your_airflow_airbyte_connection_id')
AIRBYTE_SYNC_CONNECTION_ID = os.environ.get('AIRBYTE_SYNC_CONN_ID', 'your_airbyte_workspace_connection_id')
with DAG(
dag_id='example_airbyte_sync_dag',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
tags=['airbyte', 'example'],
dagrun_timeout=timedelta(minutes=60),
default_args={
'owner': 'airflow',
}
) as dag:
trigger_airbyte_sync = AirbyteTriggerSyncOperator(
task_id='trigger_airbyte_connection_sync',
airbyte_conn_id=AIRBYTE_CONNECTION_ID,
connection_id=AIRBYTE_SYNC_CONNECTION_ID, # This is the UUID of the Airbyte connection to trigger
asynchronous=True, # Recommended to use with a sensor for long-running jobs
)
monitor_airbyte_sync = AirbyteJobSensor(
task_id='monitor_airbyte_connection_sync',
airbyte_conn_id=AIRBYTE_CONNECTION_ID,
airbyte_job_id=trigger_airbyte_sync.output,
poke_interval=5, # Check every 5 seconds
timeout=3600, # Timeout after 1 hour
)
trigger_airbyte_sync >> monitor_airbyte_sync