Fivetran Async Provider for Apache Airflow
raw JSON → 2.3.0 verified Wed Apr 15 auth: no python
The `airflow-provider-fivetran-async` library provides asynchronous operators, sensors, and hooks for integrating Fivetran with Apache Airflow. It leverages Airflow's deferrable tasks to efficiently orchestrate Fivetran data synchronization jobs, freeing up worker slots during I/O-bound waiting periods. This provider is actively maintained by Astronomer and Fivetran, with the current stable version being 2.3.0 and regular releases, including alpha versions for upcoming features and breaking changes.
pip install airflow-provider-fivetran-async Common errors
error ModuleNotFoundError: No module named 'airflow_provider_fivetran_async' ↓
cause The 'airflow-provider-fivetran-async' package is not installed in the Python environment.
fix
Install the package using pip: 'pip install airflow-provider-fivetran-async'.
error ImportError: cannot import name 'FivetranOperator' from 'fivetran_provider_async.operators' ↓
cause The import statement is incorrect due to a typo in the module path.
fix
Use the correct import statement: 'from airflow_provider_fivetran_async.operators import FivetranOperator'.
error AttributeError: module 'airflow_provider_fivetran_async.operators' has no attribute 'FivetranOperator' ↓
cause The 'FivetranOperator' class is not present in the specified module, possibly due to a version mismatch or incorrect installation.
fix
Ensure that the 'airflow-provider-fivetran-async' package is installed and up to date: 'pip install --upgrade airflow-provider-fivetran-async'.
error ValueError: Invalid 'conn_id' provided for Fivetran connection ↓
cause The 'conn_id' specified does not match any configured Fivetran connections in Airflow.
fix
Verify that the 'conn_id' matches the one configured in the Airflow UI under Connections, and that it is set up correctly with the Fivetran API Key and Secret.
error TypeError: 'NoneType' object is not subscriptable ↓
cause A variable expected to be a dictionary or list is 'None', possibly due to a missing or incorrect configuration.
fix
Check the configuration settings and ensure all required parameters are provided and correctly set.
Warnings
breaking Version 2.3.0 dropped support for Python 3.9. Users on Python 3.9 must upgrade their Python environment or use an older provider version. ↓
fix Upgrade Python to 3.10 or higher.
breaking Upcoming version 2.4.0a1 drops support for Apache Airflow versions older than 2.9. Users running older Airflow versions will need to upgrade. ↓
fix Upgrade Apache Airflow to version 2.9 or higher.
breaking Version 2.2.0 dropped support for Apache Airflow 2.2 and 2.3. ↓
fix Upgrade Apache Airflow to version 2.4 or higher (and ideally to >=2.9 for future compatibility).
gotcha The `FivetranOperator` (async provider) may fail if a Fivetran connector goes into a 'RESCHEDULED' state, rather than handling it gracefully. This is a known bug (GitHub Issue #107). ↓
fix Monitor GitHub issues for resolution or implement custom retry logic for 'RESCHEDULED' states.
deprecated This `airflow-provider-fivetran-async` provider is the recommended successor to the older, synchronous `airflow-provider-fivetran`. The legacy provider is deprecated in favor of this asynchronous version. ↓
fix Migrate from `airflow-provider-fivetran` to `airflow-provider-fivetran-async`, replacing `FivetranOperator` and `FivetranSensor` with their async counterparts (or the combined `FivetranOperator` from this async provider).
gotcha The `FivetranOperator` in this async provider is designed to both trigger a Fivetran sync and asynchronously monitor its completion. In most cases, a separate `FivetranSensor` is no longer needed downstream, which was common practice with the legacy synchronous provider. Using both unnecessarily consumes resources. ↓
fix Prefer using `FivetranOperator` (deferrable) with `wait_for_completion=True` (default) to handle both triggering and monitoring within a single task. Only use `FivetranSensor` if you need to monitor a Fivetran sync initiated outside of Airflow, or if the `FivetranOperator` has `wait_for_completion=False`.
Imports
- FivetranOperatorAsync
from fivetran_provider_async.operators import FivetranOperator - FivetranSensorAsync
from fivetran_provider_async.sensors import FivetranSensor - ResyncOperator
from fivetran_provider_async.operators.resync import ResyncOperator
Quickstart
import os
from datetime import datetime
from airflow.decorators import dag
from fivetran_provider_async.operators import FivetranOperator
FIVETRAN_API_KEY = os.environ.get('FIVETRAN_API_KEY', 'your_fivetran_api_key')
FIVETRAN_API_SECRET = os.environ.get('FIVETRAN_API_SECRET', 'your_fivetran_api_secret')
FIVETRAN_CONNECTOR_ID = os.environ.get('FIVETRAN_CONNECTOR_ID', 'your_connector_id')
@dag(
dag_id='fivetran_async_sync_example',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['fivetran', 'async', 'etl']
)
def fivetran_async_dag():
# Configure Fivetran connection in Airflow UI (Admin -> Connections)
# Conn Id: fivetran_default
# Conn Type: Fivetran
# Login (API Key): FIVETRAN_API_KEY
# Password (API Secret): FIVETRAN_API_SECRET
start_fivetran_sync = FivetranOperator(
task_id='start_fivetran_sync',
fivetran_conn_id='fivetran_default',
connector_id=FIVETRAN_CONNECTOR_ID,
deferrable=True, # This is the default behavior
wait_for_completion=True # This is the default behavior
)
fivetran_async_dag()