{"id":9469,"library":"apache-airflow-providers-influxdb","title":"InfluxDB Provider for Apache Airflow","description":"The `apache-airflow-providers-influxdb` package integrates Apache Airflow with InfluxDB, allowing users to interact with InfluxDB databases for data collection, storage, and querying within Airflow DAGs. It includes hooks, operators, and sensors to facilitate common InfluxDB operations. The current version is 2.10.4, with releases typically synchronized with major Apache Airflow releases or as needed for bug fixes and new features.","status":"active","version":"2.10.4","language":"en","source_language":"en","source_url":"https://github.com/apache/airflow/tree/main/airflow/providers/influxdb","tags":["airflow","influxdb","database","provider","data-pipeline","timeseries"],"install":[{"cmd":"pip install apache-airflow-providers-influxdb","lang":"bash","label":"Install InfluxDB provider"}],"dependencies":[],"imports":[{"symbol":"InfluxDBHook","correct":"from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook"},{"symbol":"InfluxDBOperator","correct":"from airflow.providers.influxdb.operators.influxdb import InfluxDBOperator"},{"symbol":"InfluxDBSensor","correct":"from airflow.providers.influxdb.sensors.influxdb import InfluxDBSensor"}],"quickstart":{"code":"import os\nfrom datetime import datetime\n\nfrom airflow.models.dag import DAG\nfrom airflow.providers.influxdb.operators.influxdb import InfluxDBOperator\n\n# Set environment variable for InfluxDB connection (Airflow Connection ID: influxdb_default)\n# Example for InfluxDB v2: influxdb://user:password@localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET\n# If using only token (no user/pass), it might be: influxdb://localhost:8086/?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET\nos.environ['AIRFLOW_CONN_INFLUXDB_DEFAULT'] = os.environ.get(\n    'AIRFLOW_CONN_INFLUXDB_DEFAULT',\n    'influxdb://localhost:8086/?token=my_secret_token&org=my_org&bucket=my_bucket' # Replace with actual connection details or use Airflow UI\n)\n\nwith DAG(\n    dag_id='influxdb_example_dag',\n    start_date=datetime(2023, 1, 1),\n    schedule_interval=None,\n    catchup=False,\n    tags=['influxdb', 'example'],\n) as dag:\n    write_data_to_influxdb = InfluxDBOperator(\n        task_id='write_data',\n        influxdb_conn_id='influxdb_default',\n        queries=[\n            \"_measurement='cpu_usage',host='server01' cpu=60i,usage_system=20.5\",\n            \"_measurement='cpu_usage',host='server02' cpu=70i,usage_system=25.0\"\n        ],\n        # The provider assumes these are InfluxDB line protocol strings by default.\n        # If you need to execute Flux queries, you would pass `queries=['from(bucket:\"my_bucket\") |> range(start: -1h)']`\n        # and set `query_type='flux'` if needed, though line protocol for writing is most common.\n    )\n\n    # Example of reading data (requires a Flux query)\n    read_data_from_influxdb = InfluxDBOperator(\n        task_id='read_data',\n        influxdb_conn_id='influxdb_default',\n        queries=[\n            'from(bucket:\"my_bucket\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \"cpu_usage\")'\n        ],\n        query_type='flux',\n        # results_processor=lambda results: print(f\"Query results: {results}\") # Uncomment to process results\n    )\n\n    write_data_to_influxdb >> read_data_from_influxdb\n","lang":"python","description":"This quickstart demonstrates a basic Airflow DAG that uses the `InfluxDBOperator` to write data to and read data from an InfluxDB instance. It requires an Airflow connection named `influxdb_default` configured with your InfluxDB endpoint and authentication details (e.g., token, organization, bucket for InfluxDB v2). The example uses line protocol for writing and a Flux query for reading."},"warnings":[{"fix":"For InfluxDB v2, use connection string parameters like `?token=YOUR_TOKEN&org=YOUR_ORG&bucket=YOUR_BUCKET`. For v1, ensure `login` and `password` fields are correctly set. Review the official provider documentation for specific connection string formats.","message":"InfluxDB v1.x and v2.x use different authentication mechanisms and query languages. Ensure your Airflow connection configuration (connection string, extra fields) aligns with your InfluxDB version. V2.x uses tokens, organizations, and buckets, while V1.x uses username/password and databases/retention policies.","severity":"gotcha","affected_versions":"All versions (user configuration dependent)"},{"fix":"Always check the release notes for `apache-airflow` and `apache-airflow-providers-influxdb` when planning upgrades. Test DAGs thoroughly in a staging environment before deploying to production.","message":"Airflow providers, including InfluxDB, are versioned independently but often align with Airflow core releases. Upgrading Airflow might introduce breaking changes in provider APIs if the provider version is also significantly updated, requiring adjustments to DAG code.","severity":"breaking","affected_versions":"Across major Airflow and provider versions (e.g., Airflow 1.x to 2.x, or provider 1.x to 2.x)"},{"fix":"When writing, provide Line Protocol strings directly. When reading with InfluxDB v2, use Flux queries and ensure your connection is configured for Flux (implicitly handled by InfluxDB v2 token/org/bucket setup). If reading with InfluxQL (for v1), ensure `query_type='influxql'`.","message":"The `InfluxDBOperator`'s `queries` parameter expects a list of query strings. For writing data, these are typically InfluxDB Line Protocol strings. For reading data, especially with InfluxDB v2, these should be Flux queries, and you might need to explicitly set `query_type='flux'`.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-17T00:00:00.000Z","next_check":"2026-07-16T00:00:00.000Z","problems":[{"fix":"Run `pip install apache-airflow-providers-influxdb` in the same Python environment where Airflow is installed.","cause":"The `apache-airflow-providers-influxdb` package has not been installed in your Airflow environment.","error":"ModuleNotFoundError: No module named 'airflow.providers.influxdb'"},{"fix":"Verify the `influxdb_conn_id` used in your operator/hook, and check the corresponding Airflow Connection details (Host, Port, Schema, Login, Password, Extra fields like `token`, `org`, `bucket`). Ensure the token/credentials have the necessary read/write permissions.","cause":"The Airflow connection details for InfluxDB (e.g., token, username, password, organization) are incorrect or do not match the target InfluxDB instance.","error":"InfluxDBClient.exceptions.InfluxDBClientException: authentication failed"},{"fix":"Consult the official documentation for your installed `apache-airflow-providers-influxdb` version to ensure you are using the correct parameters and their names. Parameters often change between provider versions.","cause":"You are using a parameter that is not recognized by the `InfluxDBOperator` in your installed provider version, or it has been renamed/removed.","error":"TypeError: InfluxDBOperator.__init__() got an unexpected keyword argument 'some_parameter'"},{"fix":"Verify the `bucket` parameter in your Airflow connection's 'Extra' field (or connection string) matches an existing bucket. Also, confirm that the InfluxDB token used has permissions for that specific bucket.","cause":"The specified bucket in the Airflow connection or the Flux query does not exist in your InfluxDB v2 instance, or the provided token does not have access to it.","error":"InfluxDBClient.exceptions.InfluxDBClientException: bucket not found"}]}