OpenSearch Airflow Provider
The `apache-airflow-providers-opensearch` library is an official Apache Airflow provider package for interacting with OpenSearch. It includes operators, hooks, and sensors to integrate OpenSearch functionality directly into Airflow DAGs for tasks like data ingestion, search queries, and cluster monitoring. As part of the Apache Airflow ecosystem, it generally follows a regular release cadence, often aligning with Airflow core releases, and is currently at version 1.9.0.
Common errors
-
ModuleNotFoundError: No module named 'airflow.providers.opensearch'
cause The `apache-airflow-providers-opensearch` package has not been installed in your Airflow environment.fixRun `pip install apache-airflow-providers-opensearch` to install the provider. -
airflow.exceptions.AirflowException: The conn_id 'opensearch_default' is not defined
cause Airflow cannot find a connection named 'opensearch_default' (or whatever `conn_id` you specified) in its metadata database.fixGo to Airflow UI -> Admin -> Connections and create a new connection. Set 'Conn Id' to `opensearch_default` and 'Conn Type' to 'OpenSearch', then fill in the host, port, and other details for your OpenSearch instance. -
opensearch.exceptions.ConnectionError: Connection refused
cause The Airflow worker cannot establish a network connection to the specified OpenSearch host and port. This could be due to an incorrect host/port, OpenSearch not running, or firewall issues.fixVerify the OpenSearch service is running and accessible from your Airflow worker. Check the `Host` and `Port` in your OpenSearch connection configuration. Ensure no firewalls are blocking the connection. -
opensearch.exceptions.TransportError(401, 'security_exception', 'no permissions for [cluster:monitor/health] and User [name=airflow, backend_roles=[], requestedTenant=null]')
cause The OpenSearch user configured in your connection does not have sufficient permissions to perform the requested operation.fixEnsure the user/role associated with your OpenSearch connection has the necessary permissions (e.g., `cluster_monitor` for `_cluster/health`, `indices:data/write/document` for ingestion) in your OpenSearch Security configuration.
Warnings
- gotcha The `conn_id` for OpenSearch connections (commonly `opensearch_default`) must be correctly configured in the Airflow UI or `airflow.cfg`. Missing or incorrect connection details are a frequent cause of 'Connection not found' or network errors.
- breaking Changes in Airflow core or `opensearch-py` client library versions can lead to incompatibilities. Always check the provider's `min_airflow_version` and test against your specific OpenSearch cluster version.
- gotcha SSL/TLS certificate verification errors are common when connecting to OpenSearch clusters using self-signed certificates or when the certificate chain is not properly trusted by the Airflow worker environment.
- gotcha The OpenSearch provider was forked from the Elasticsearch provider. While largely compatible, be aware of specific OpenSearch features or API differences. Direct migration from `apache-airflow-providers-elasticsearch` might require minor adjustments.
Install
-
pip install apache-airflow-providers-opensearch
Imports
- OpenSearchHook
from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
- OpenSearchOperator
from airflow.providers.opensearch.operators.opensearch import OpenSearchOperator
- OpenSearchIngestOperator
from airflow.providers.opensearch.operators.opensearch import OpenSearchIngestOperator
from airflow.providers.opensearch.operators.opensearch_ingest import OpenSearchIngestOperator
- OpenSearchIndexSensor
from airflow.providers.opensearch.sensors.opensearch import OpenSearchIndexSensor
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.opensearch.operators.opensearch import OpenSearchOperator
with DAG(
dag_id="opensearch_example_dag",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["opensearch", "example"],
) as dag:
# Configure an OpenSearch connection in Airflow UI (Admin -> Connections)
# with conn_id='opensearch_default'. Example:
# Conn Id: opensearch_default
# Conn Type: OpenSearch
# Host: http://localhost
# Port: 9200
# Scheme: http (or https)
# Verify SSL: False (for local dev, use True in prod)
check_opensearch_health = OpenSearchOperator(
task_id="check_opensearch_health",
conn_id="opensearch_default",
endpoint="_cluster/health",
method="GET",
log_response=True,
)
# You can also perform data ingestion
# ingest_document = OpenSearchOperator(
# task_id="ingest_example_document",
# conn_id="opensearch_default",
# endpoint="my_index/_doc/1",
# method="POST",
# data={"field": "value"},
# headers={"Content-Type": "application/json"},
# log_response=True,
# )
# check_opensearch_health >> ingest_document