InfluxDB Provider for Apache Airflow
The `apache-airflow-providers-influxdb` package integrates Apache Airflow with InfluxDB, allowing users to interact with InfluxDB databases for data collection, storage, and querying within Airflow DAGs. It includes hooks, operators, and sensors to facilitate common InfluxDB operations. The current version is 2.10.4, with releases typically synchronized with major Apache Airflow releases or as needed for bug fixes and new features.
Common errors
-
ModuleNotFoundError: No module named 'airflow.providers.influxdb'
cause The `apache-airflow-providers-influxdb` package has not been installed in your Airflow environment.fixRun `pip install apache-airflow-providers-influxdb` in the same Python environment where Airflow is installed. -
InfluxDBClient.exceptions.InfluxDBClientException: authentication failed
cause The Airflow connection details for InfluxDB (e.g., token, username, password, organization) are incorrect or do not match the target InfluxDB instance.fixVerify the `influxdb_conn_id` used in your operator/hook, and check the corresponding Airflow Connection details (Host, Port, Schema, Login, Password, Extra fields like `token`, `org`, `bucket`). Ensure the token/credentials have the necessary read/write permissions. -
TypeError: InfluxDBOperator.__init__() got an unexpected keyword argument 'some_parameter'
cause You are using a parameter that is not recognized by the `InfluxDBOperator` in your installed provider version, or it has been renamed/removed.fixConsult the official documentation for your installed `apache-airflow-providers-influxdb` version to ensure you are using the correct parameters and their names. Parameters often change between provider versions. -
InfluxDBClient.exceptions.InfluxDBClientException: bucket not found
cause The specified bucket in the Airflow connection or the Flux query does not exist in your InfluxDB v2 instance, or the provided token does not have access to it.fixVerify the `bucket` parameter in your Airflow connection's 'Extra' field (or connection string) matches an existing bucket. Also, confirm that the InfluxDB token used has permissions for that specific bucket.
Warnings
- gotcha InfluxDB v1.x and v2.x use different authentication mechanisms and query languages. Ensure your Airflow connection configuration (connection string, extra fields) aligns with your InfluxDB version. V2.x uses tokens, organizations, and buckets, while V1.x uses username/password and databases/retention policies.
- breaking Airflow providers, including InfluxDB, are versioned independently but often align with Airflow core releases. Upgrading Airflow might introduce breaking changes in provider APIs if the provider version is also significantly updated, requiring adjustments to DAG code.
- gotcha The `InfluxDBOperator`'s `queries` parameter expects a list of query strings. For writing data, these are typically InfluxDB Line Protocol strings. For reading data, especially with InfluxDB v2, these should be Flux queries, and you might need to explicitly set `query_type='flux'`.
Install
-
pip install apache-airflow-providers-influxdb
Imports
- InfluxDBHook
from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
- InfluxDBOperator
from airflow.providers.influxdb.operators.influxdb import InfluxDBOperator
- InfluxDBSensor
from airflow.providers.influxdb.sensors.influxdb import InfluxDBSensor
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.influxdb.operators.influxdb import InfluxDBOperator
# Set environment variable for InfluxDB connection (Airflow Connection ID: influxdb_default)
# Example for InfluxDB v2: influxdb://user:password@localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET
# If using only token (no user/pass), it might be: influxdb://localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET
os.environ['AIRFLOW_CONN_INFLUXDB_DEFAULT'] = os.environ.get(
'AIRFLOW_CONN_INFLUXDB_DEFAULT',
'influxdb://localhost:8086/?token=my_secret_token&org=my_org&bucket=my_bucket' # Replace with actual connection details or use Airflow UI
)
with DAG(
dag_id='influxdb_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['influxdb', 'example'],
) as dag:
write_data_to_influxdb = InfluxDBOperator(
task_id='write_data',
influxdb_conn_id='influxdb_default',
queries=[
"_measurement='cpu_usage',host='server01' cpu=60i,usage_system=20.5",
"_measurement='cpu_usage',host='server02' cpu=70i,usage_system=25.0"
],
# The provider assumes these are InfluxDB line protocol strings by default.
# If you need to execute Flux queries, you would pass `queries=['from(bucket:"my_bucket") |> range(start: -1h)']`
# and set `query_type='flux'` if needed, though line protocol for writing is most common.
)
# Example of reading data (requires a Flux query)
read_data_from_influxdb = InfluxDBOperator(
task_id='read_data',
influxdb_conn_id='influxdb_default',
queries=[
'from(bucket:"my_bucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu_usage")'
],
query_type='flux',
# results_processor=lambda results: print(f"Query results: {results}") # Uncomment to process results
)
write_data_to_influxdb >> read_data_from_influxdb