Apache Airflow Elasticsearch Provider
This is a provider package for Apache Airflow, enabling integration with Elasticsearch. It includes Elasticsearch hooks, operators (if any, though hooks are primary for direct interaction), and support for logging tasks to Elasticsearch. It aims to simplify the orchestration of tasks that interact with Elasticsearch. The current version is 6.5.2 and it follows a frequent release cadence, often with monthly or bi-weekly updates.
Warnings
- breaking Minimum Airflow version requirement for providers has increased. For `apache-airflow-providers-elasticsearch` versions 6.4.0 and above, `Apache Airflow >=2.11.0` is required. Older provider versions have different minimum Airflow requirements.
- breaking Elasticsearch client compatibility changes. Provider version 5.0.0 upgraded to use Elasticsearch 8 Python client, recommending an upgrade of your Elasticsearch database to 8. Provider 6.5.1 adds support for Elasticsearch 9.
- breaking Removal of Jinja templating support in `log_id` for Elasticsearch task logging. Previously undocumented Jinja templating was removed.
- breaking The `apply_default` decorator was removed in Airflow 2.1.0. Provider versions `2.0.1` and newer requiring `Airflow 2.1.0+` might cause automatic Airflow package upgrades if your Airflow version is older, necessitating a manual `airflow upgrade db`.
- gotcha The `write_to_es` Elasticsearch task logging feature was incompatible with Airflow 3 in provider versions prior to 6.5.0.
- gotcha Incorrect log ordering in Elasticsearch can occur if the 'offset' field is not properly configured. This was a known issue that required specific Fluent Bit configuration and Airflow settings.
Install
-
pip install apache-airflow-providers-elasticsearch
Imports
- ElasticsearchHook
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
- ElasticsearchPythonHook
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
- ElasticsearchTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
Quickstart
import os
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
def _query_elasticsearch(**kwargs):
# Ensure your Airflow Connection 'elasticsearch_default' is configured
# or set ELASTICSEARCH_HOST environment variable.
es_host = os.environ.get('ELASTICSEARCH_HOST', 'http://localhost:9200')
es_hook = ElasticsearchPythonHook(hosts=[es_host])
# Simple match_all query
query = {"query": {"match_all": {}}}
try:
result = es_hook.search(index='your_index_name', query=query)
print(f"Successfully queried Elasticsearch. Hits: {result['hits']['total']['value']}")
# Example: Ingesting data
doc = {"timestamp": datetime.now().isoformat(), "message": "Hello from Airflow!"}
es_hook.add_doc(index='airflow_logs', doc=doc)
print("Successfully added a document to 'airflow_logs'.")
except Exception as e:
print(f"Error interacting with Elasticsearch: {e}")
raise
with DAG(
dag_id='elasticsearch_quickstart_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['elasticsearch', 'example'],
) as dag:
query_es_task = PythonOperator(
task_id='query_and_add_doc_to_elasticsearch',
python_callable=_query_elasticsearch,
)