{"id":6308,"library":"apache-airflow-providers-elasticsearch","title":"Apache Airflow Elasticsearch Provider","description":"This is a provider package for Apache Airflow, enabling integration with Elasticsearch. It includes Elasticsearch hooks, operators (if any, though hooks are primary for direct interaction), and support for logging tasks to Elasticsearch. It aims to simplify the orchestration of tasks that interact with Elasticsearch. The current version is 6.5.2 and it follows a frequent release cadence, often with monthly or bi-weekly updates.","status":"active","version":"6.5.2","language":"en","source_language":"en","source_url":"https://github.com/apache/airflow/tree/main/airflow/providers/elasticsearch","tags":["airflow","provider","elasticsearch","hook","logging","data-pipeline"],"install":[{"cmd":"pip install apache-airflow-providers-elasticsearch","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"Core Airflow dependency. Provider version 6.4.0+ requires Airflow >=2.11.0.","package":"apache-airflow","optional":false},{"reason":"Python client for Elasticsearch. Provider 6.5.1 supports Elasticsearch 9.","package":"elasticsearch","optional":false},{"reason":"Used for Elasticsearch SQL API interaction.","package":"elasticsearch-dbapi","optional":false},{"reason":"DSL for building and running complex queries.","package":"elasticsearch-dsl","optional":false},{"reason":"Cross-provider dependency for SQL-related features, installed via `[common.sql]` extra.","package":"apache-airflow-providers-common-sql","optional":true}],"imports":[{"symbol":"ElasticsearchHook","correct":"from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook"},{"symbol":"ElasticsearchPythonHook","correct":"from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook"},{"note":"Old import path for community/contrib hooks in Airflow 1.x. Always use `airflow.providers.*` for Airflow 2+.","wrong":"from airflow.contrib.hooks.elasticsearch_hook import ElasticsearchHook","symbol":"ElasticsearchTaskHandler","correct":"from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler"}],"quickstart":{"code":"import os\nfrom datetime import datetime\n\nfrom airflow import DAG\nfrom airflow.operators.python import PythonOperator\nfrom airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook\n\ndef _query_elasticsearch(**kwargs):\n    # Ensure your Airflow Connection 'elasticsearch_default' is configured\n    # or set ELASTICSEARCH_HOST environment variable.\n    es_host = os.environ.get('ELASTICSEARCH_HOST', 'http://localhost:9200')\n    es_hook = ElasticsearchPythonHook(hosts=[es_host])\n    \n    # Simple match_all query\n    query = {\"query\": {\"match_all\": {}}}\n    try:\n        result = es_hook.search(index='your_index_name', query=query)\n        print(f\"Successfully queried Elasticsearch. Hits: {result['hits']['total']['value']}\")\n        # Example: Ingesting data\n        doc = {\"timestamp\": datetime.now().isoformat(), \"message\": \"Hello from Airflow!\"}\n        es_hook.add_doc(index='airflow_logs', doc=doc)\n        print(\"Successfully added a document to 'airflow_logs'.\")\n    except Exception as e:\n        print(f\"Error interacting with Elasticsearch: {e}\")\n        raise\n\nwith DAG(\n    dag_id='elasticsearch_quickstart_dag',\n    start_date=datetime(2023, 1, 1),\n    schedule_interval=None,\n    catchup=False,\n    tags=['elasticsearch', 'example'],\n) as dag:\n    query_es_task = PythonOperator(\n        task_id='query_and_add_doc_to_elasticsearch',\n        python_callable=_query_elasticsearch,\n    )","lang":"python","description":"This quickstart demonstrates how to use the `ElasticsearchPythonHook` within an Airflow DAG. It shows how to connect to an Elasticsearch instance (expecting a connection ID `elasticsearch_default` or `ELASTICSEARCH_HOST` env var), perform a simple search query, and add a document to an index. Remember to configure your Elasticsearch connection in Airflow UI or via environment variables."},"warnings":[{"fix":"Ensure your Airflow environment meets the minimum version requirement. Upgrade Airflow to 2.11.0 or newer if using provider >=6.4.0.","message":"Minimum Airflow version requirement for providers has increased. For `apache-airflow-providers-elasticsearch` versions 6.4.0 and above, `Apache Airflow >=2.11.0` is required. Older provider versions have different minimum Airflow requirements.","severity":"breaking","affected_versions":">=6.4.0"},{"fix":"Upgrade your Elasticsearch cluster to version 8 or 9 for full compatibility, especially if experiencing issues with client-server communication.","message":"Elasticsearch client compatibility changes. Provider version 5.0.0 upgraded to use Elasticsearch 8 Python client, recommending an upgrade of your Elasticsearch database to 8. Provider 6.5.1 adds support for Elasticsearch 9.","severity":"breaking","affected_versions":">=5.0.0"},{"fix":"If you used Jinja templates for `log_id`, migrate to a template string format (e.g., `{dag_id}-{task_id}-{execution_date}-{try_number}`). Update existing Elasticsearch documents if `execution_date` was templated.","message":"Removal of Jinja templating support in `log_id` for Elasticsearch task logging. Previously undocumented Jinja templating was removed.","severity":"breaking","affected_versions":">=2.0.1"},{"fix":"Upgrade Airflow to at least version 2.1.0 before installing or upgrading this provider to avoid automatic Airflow upgrades and subsequent database migration requirements.","message":"The `apply_default` decorator was removed in Airflow 2.1.0. Provider versions `2.0.1` and newer requiring `Airflow 2.1.0+` might cause automatic Airflow package upgrades if your Airflow version is older, necessitating a manual `airflow upgrade db`.","severity":"breaking","affected_versions":">=2.0.1 (with Airflow <2.1.0)"},{"fix":"Upgrade to `apache-airflow-providers-elasticsearch>=6.5.0` to ensure proper functionality of `write_to_es` with Airflow 3.","message":"The `write_to_es` Elasticsearch task logging feature was incompatible with Airflow 3 in provider versions prior to 6.5.0.","severity":"gotcha","affected_versions":"<6.5.0 (with Airflow 3)"},{"fix":"Ensure your Fluent Bit configuration appends an 'Offset_Key' to logs and set `AIRFLOW__ELASTICSEARCH__OFFSET_FIELD` to match this key (e.g., `AIRFLOW__ELASTICSEARCH__OFFSET_FIELD=\"custom_offset\"`).","message":"Incorrect log ordering in Elasticsearch can occur if the 'offset' field is not properly configured. This was a known issue that required specific Fluent Bit configuration and Airflow settings.","severity":"gotcha","affected_versions":"<2.0.3 (and specific configurations)"}],"env_vars":null,"last_verified":"2026-04-15T00:00:00.000Z","next_check":"2026-07-14T00:00:00.000Z"}