OpenSearch Airflow Provider

1.9.0 · active · verified Thu Apr 16

The `apache-airflow-providers-opensearch` library is an official Apache Airflow provider package for interacting with OpenSearch. It includes operators, hooks, and sensors to integrate OpenSearch functionality directly into Airflow DAGs for tasks like data ingestion, search queries, and cluster monitoring. As part of the Apache Airflow ecosystem, it generally follows a regular release cadence, often aligning with Airflow core releases, and is currently at version 1.9.0.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart DAG demonstrates how to use the `OpenSearchOperator` to perform a simple GET request to the OpenSearch cluster's `_cluster/health` endpoint. Before running, ensure you have an OpenSearch connection configured in your Airflow environment with the `conn_id` set to `opensearch_default`.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.providers.opensearch.operators.opensearch import OpenSearchOperator

with DAG(
    dag_id="opensearch_example_dag",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["opensearch", "example"],
) as dag:
    # Configure an OpenSearch connection in Airflow UI (Admin -> Connections)
    # with conn_id='opensearch_default'. Example:
    # Conn Id: opensearch_default
    # Conn Type: OpenSearch
    # Host: http://localhost
    # Port: 9200
    # Scheme: http (or https)
    # Verify SSL: False (for local dev, use True in prod)
    
    check_opensearch_health = OpenSearchOperator(
        task_id="check_opensearch_health",
        conn_id="opensearch_default",
        endpoint="_cluster/health",
        method="GET",
        log_response=True,
    )

    # You can also perform data ingestion
    # ingest_document = OpenSearchOperator(
    #     task_id="ingest_example_document",
    #     conn_id="opensearch_default",
    #     endpoint="my_index/_doc/1",
    #     method="POST",
    #     data={"field": "value"},
    #     headers={"Content-Type": "application/json"},
    #     log_response=True,
    # )
    # check_opensearch_health >> ingest_document

view raw JSON →