Fivetran Async Provider for Apache Airflow
The `airflow-provider-fivetran-async` library provides asynchronous operators, sensors, and hooks for integrating Fivetran with Apache Airflow. It leverages Airflow's deferrable tasks to efficiently orchestrate Fivetran data synchronization jobs, freeing up worker slots during I/O-bound waiting periods. This provider is actively maintained by Astronomer and Fivetran, with the current stable version being 2.3.0 and regular releases, including alpha versions for upcoming features and breaking changes.
Common errors
-
ModuleNotFoundError: No module named 'airflow_provider_fivetran_async'
cause The 'airflow-provider-fivetran-async' package is not installed in the Python environment.fixInstall the package using pip: 'pip install airflow-provider-fivetran-async'. -
ImportError: cannot import name 'FivetranOperator' from 'fivetran_provider_async.operators'
cause The import statement is incorrect due to a typo in the module path.fixUse the correct import statement: 'from airflow_provider_fivetran_async.operators import FivetranOperator'. -
AttributeError: module 'airflow_provider_fivetran_async.operators' has no attribute 'FivetranOperator'
cause The 'FivetranOperator' class is not present in the specified module, possibly due to a version mismatch or incorrect installation.fixEnsure that the 'airflow-provider-fivetran-async' package is installed and up to date: 'pip install --upgrade airflow-provider-fivetran-async'. -
ValueError: Invalid 'conn_id' provided for Fivetran connection
cause The 'conn_id' specified does not match any configured Fivetran connections in Airflow.fixVerify that the 'conn_id' matches the one configured in the Airflow UI under Connections, and that it is set up correctly with the Fivetran API Key and Secret. -
TypeError: 'NoneType' object is not subscriptable
cause A variable expected to be a dictionary or list is 'None', possibly due to a missing or incorrect configuration.fixCheck the configuration settings and ensure all required parameters are provided and correctly set.
Warnings
- breaking Version 2.3.0 dropped support for Python 3.9. Users on Python 3.9 must upgrade their Python environment or use an older provider version.
- breaking Upcoming version 2.4.0a1 drops support for Apache Airflow versions older than 2.9. Users running older Airflow versions will need to upgrade.
- breaking Version 2.2.0 dropped support for Apache Airflow 2.2 and 2.3.
- gotcha The `FivetranOperator` (async provider) may fail if a Fivetran connector goes into a 'RESCHEDULED' state, rather than handling it gracefully. This is a known bug (GitHub Issue #107).
- deprecated This `airflow-provider-fivetran-async` provider is the recommended successor to the older, synchronous `airflow-provider-fivetran`. The legacy provider is deprecated in favor of this asynchronous version.
- gotcha The `FivetranOperator` in this async provider is designed to both trigger a Fivetran sync and asynchronously monitor its completion. In most cases, a separate `FivetranSensor` is no longer needed downstream, which was common practice with the legacy synchronous provider. Using both unnecessarily consumes resources.
Install
-
pip install airflow-provider-fivetran-async
Imports
- FivetranOperatorAsync
from fivetran_provider_async.operators import FivetranOperator
- FivetranSensorAsync
from fivetran_provider_async.sensors import FivetranSensor
- ResyncOperator
from fivetran_provider_async.operators.resync import ResyncOperator
Quickstart
import os
from datetime import datetime
from airflow.decorators import dag
from fivetran_provider_async.operators import FivetranOperator
FIVETRAN_API_KEY = os.environ.get('FIVETRAN_API_KEY', 'your_fivetran_api_key')
FIVETRAN_API_SECRET = os.environ.get('FIVETRAN_API_SECRET', 'your_fivetran_api_secret')
FIVETRAN_CONNECTOR_ID = os.environ.get('FIVETRAN_CONNECTOR_ID', 'your_connector_id')
@dag(
dag_id='fivetran_async_sync_example',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['fivetran', 'async', 'etl']
)
def fivetran_async_dag():
# Configure Fivetran connection in Airflow UI (Admin -> Connections)
# Conn Id: fivetran_default
# Conn Type: Fivetran
# Login (API Key): FIVETRAN_API_KEY
# Password (API Secret): FIVETRAN_API_SECRET
start_fivetran_sync = FivetranOperator(
task_id='start_fivetran_sync',
fivetran_conn_id='fivetran_default',
connector_id=FIVETRAN_CONNECTOR_ID,
deferrable=True, # This is the default behavior
wait_for_completion=True # This is the default behavior
)
fivetran_async_dag()