Apache Airflow Elasticsearch Provider

6.5.2 · active · verified Wed Apr 15

This is a provider package for Apache Airflow, enabling integration with Elasticsearch. It includes Elasticsearch hooks, operators (if any, though hooks are primary for direct interaction), and support for logging tasks to Elasticsearch. It aims to simplify the orchestration of tasks that interact with Elasticsearch. The current version is 6.5.2 and it follows a frequent release cadence, often with monthly or bi-weekly updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `ElasticsearchPythonHook` within an Airflow DAG. It shows how to connect to an Elasticsearch instance (expecting a connection ID `elasticsearch_default` or `ELASTICSEARCH_HOST` env var), perform a simple search query, and add a document to an index. Remember to configure your Elasticsearch connection in Airflow UI or via environment variables.

import os
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

def _query_elasticsearch(**kwargs):
    # Ensure your Airflow Connection 'elasticsearch_default' is configured
    # or set ELASTICSEARCH_HOST environment variable.
    es_host = os.environ.get('ELASTICSEARCH_HOST', 'http://localhost:9200')
    es_hook = ElasticsearchPythonHook(hosts=[es_host])
    
    # Simple match_all query
    query = {"query": {"match_all": {}}}
    try:
        result = es_hook.search(index='your_index_name', query=query)
        print(f"Successfully queried Elasticsearch. Hits: {result['hits']['total']['value']}")
        # Example: Ingesting data
        doc = {"timestamp": datetime.now().isoformat(), "message": "Hello from Airflow!"}
        es_hook.add_doc(index='airflow_logs', doc=doc)
        print("Successfully added a document to 'airflow_logs'.")
    except Exception as e:
        print(f"Error interacting with Elasticsearch: {e}")
        raise

with DAG(
    dag_id='elasticsearch_quickstart_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['elasticsearch', 'example'],
) as dag:
    query_es_task = PythonOperator(
        task_id='query_and_add_doc_to_elasticsearch',
        python_callable=_query_elasticsearch,
    )

view raw JSON →