InfluxDB Provider for Apache Airflow

2.10.4 · active · verified Fri Apr 17

The `apache-airflow-providers-influxdb` package integrates Apache Airflow with InfluxDB, allowing users to interact with InfluxDB databases for data collection, storage, and querying within Airflow DAGs. It includes hooks, operators, and sensors to facilitate common InfluxDB operations. The current version is 2.10.4, with releases typically synchronized with major Apache Airflow releases or as needed for bug fixes and new features.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Airflow DAG that uses the `InfluxDBOperator` to write data to and read data from an InfluxDB instance. It requires an Airflow connection named `influxdb_default` configured with your InfluxDB endpoint and authentication details (e.g., token, organization, bucket for InfluxDB v2). The example uses line protocol for writing and a Flux query for reading.

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.influxdb.operators.influxdb import InfluxDBOperator

# Set environment variable for InfluxDB connection (Airflow Connection ID: influxdb_default)
# Example for InfluxDB v2: influxdb://user:password@localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET
# If using only token (no user/pass), it might be: influxdb://localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET
os.environ['AIRFLOW_CONN_INFLUXDB_DEFAULT'] = os.environ.get(
    'AIRFLOW_CONN_INFLUXDB_DEFAULT',
    'influxdb://localhost:8086/?token=my_secret_token&org=my_org&bucket=my_bucket' # Replace with actual connection details or use Airflow UI
)

with DAG(
    dag_id='influxdb_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['influxdb', 'example'],
) as dag:
    write_data_to_influxdb = InfluxDBOperator(
        task_id='write_data',
        influxdb_conn_id='influxdb_default',
        queries=[
            "_measurement='cpu_usage',host='server01' cpu=60i,usage_system=20.5",
            "_measurement='cpu_usage',host='server02' cpu=70i,usage_system=25.0"
        ],
        # The provider assumes these are InfluxDB line protocol strings by default.
        # If you need to execute Flux queries, you would pass `queries=['from(bucket:"my_bucket") |> range(start: -1h)']`
        # and set `query_type='flux'` if needed, though line protocol for writing is most common.
    )

    # Example of reading data (requires a Flux query)
    read_data_from_influxdb = InfluxDBOperator(
        task_id='read_data',
        influxdb_conn_id='influxdb_default',
        queries=[
            'from(bucket:"my_bucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu_usage")'
        ],
        query_type='flux',
        # results_processor=lambda results: print(f"Query results: {results}") # Uncomment to process results
    )

    write_data_to_influxdb >> read_data_from_influxdb

view raw JSON →