Airbyte Apache Airflow Provider

5.4.0 · active · verified Sat Apr 11

The `apache-airflow-providers-airbyte` package provides Apache Airflow operators and sensors to interact with Airbyte, an open-source data integration platform. It enables users to trigger and monitor Airbyte synchronization jobs directly from Airflow DAGs. The current version is 5.4.0, supporting Airflow >=2.11.0 and Python >=3.10, and it maintains a regular release cadence with ongoing development.

Warnings

Install

Imports

Quickstart

This quickstart DAG demonstrates how to trigger and monitor an Airbyte synchronization job using the `AirbyteTriggerSyncOperator` and `AirbyteJobSensor`. Before running, you need to: 1. Install the provider: `pip install apache-airflow-providers-airbyte[http]`. 2. Configure an Airflow 'Airbyte' connection (e.g., `AIRBYTE_CONNECTION_ID`) pointing to your Airbyte instance's API (e.g., `http://localhost:8001`). 3. Obtain the UUID of the specific Airbyte connection you wish to sync from the Airbyte UI (this is `AIRBYTE_SYNC_CONNECTION_ID`). 4. Set `AIRBYTE_CONN_ID` and `AIRBYTE_SYNC_CONN_ID` as environment variables or replace them directly in the DAG code.

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.utils.dates import days_ago

AIRBYTE_CONNECTION_ID = os.environ.get('AIRBYTE_CONN_ID', 'your_airflow_airbyte_connection_id')
AIRBYTE_SYNC_CONNECTION_ID = os.environ.get('AIRBYTE_SYNC_CONN_ID', 'your_airbyte_workspace_connection_id')

with DAG(
    dag_id='example_airbyte_sync_dag',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
    tags=['airbyte', 'example'],
    dagrun_timeout=timedelta(minutes=60),
    default_args={
        'owner': 'airflow',
    }
) as dag:
    trigger_airbyte_sync = AirbyteTriggerSyncOperator(
        task_id='trigger_airbyte_connection_sync',
        airbyte_conn_id=AIRBYTE_CONNECTION_ID,
        connection_id=AIRBYTE_SYNC_CONNECTION_ID, # This is the UUID of the Airbyte connection to trigger
        asynchronous=True, # Recommended to use with a sensor for long-running jobs
    )

    monitor_airbyte_sync = AirbyteJobSensor(
        task_id='monitor_airbyte_connection_sync',
        airbyte_conn_id=AIRBYTE_CONNECTION_ID,
        airbyte_job_id=trigger_airbyte_sync.output,
        poke_interval=5, # Check every 5 seconds
        timeout=3600, # Timeout after 1 hour
    )

    trigger_airbyte_sync >> monitor_airbyte_sync

view raw JSON →