OpenSearch Airflow Provider
raw JSON → 1.9.0 verified Thu Apr 16 auth: no python
The `apache-airflow-providers-opensearch` library is an official Apache Airflow provider package for interacting with OpenSearch. It includes operators, hooks, and sensors to integrate OpenSearch functionality directly into Airflow DAGs for tasks like data ingestion, search queries, and cluster monitoring. As part of the Apache Airflow ecosystem, it generally follows a regular release cadence, often aligning with Airflow core releases, and is currently at version 1.9.0.
pip install apache-airflow-providers-opensearch cli
airflow Common errors
error ModuleNotFoundError: No module named 'airflow.providers.opensearch' ↓
cause The `apache-airflow-providers-opensearch` package has not been installed in your Airflow environment.
fix
Run
pip install apache-airflow-providers-opensearch to install the provider. error airflow.exceptions.AirflowException: The conn_id 'opensearch_default' is not defined ↓
cause Airflow cannot find a connection named 'opensearch_default' (or whatever `conn_id` you specified) in its metadata database.
fix
Go to Airflow UI -> Admin -> Connections and create a new connection. Set 'Conn Id' to
opensearch_default and 'Conn Type' to 'OpenSearch', then fill in the host, port, and other details for your OpenSearch instance. error opensearch.exceptions.ConnectionError: Connection refused ↓
cause The Airflow worker cannot establish a network connection to the specified OpenSearch host and port. This could be due to an incorrect host/port, OpenSearch not running, or firewall issues.
fix
Verify the OpenSearch service is running and accessible from your Airflow worker. Check the
Host and Port in your OpenSearch connection configuration. Ensure no firewalls are blocking the connection. error opensearch.exceptions.TransportError(401, 'security_exception', 'no permissions for [cluster:monitor/health] and User [name=airflow, backend_roles=[], requestedTenant=null]') ↓
cause The OpenSearch user configured in your connection does not have sufficient permissions to perform the requested operation.
fix
Ensure the user/role associated with your OpenSearch connection has the necessary permissions (e.g.,
cluster_monitor for _cluster/health, indices:data/write/document for ingestion) in your OpenSearch Security configuration. Warnings
gotcha The `conn_id` for OpenSearch connections (commonly `opensearch_default`) must be correctly configured in the Airflow UI or `airflow.cfg`. Missing or incorrect connection details are a frequent cause of 'Connection not found' or network errors. ↓
fix Ensure the connection with the specified `conn_id` exists, has 'OpenSearch' as connection type, and correct host, port, scheme, and credentials. Test the connection in the Airflow UI.
breaking Changes in Airflow core or `opensearch-py` client library versions can lead to incompatibilities. Always check the provider's `min_airflow_version` and test against your specific OpenSearch cluster version. ↓
fix Refer to the official Airflow provider documentation for compatibility matrices. Upgrade both `apache-airflow` and `apache-airflow-providers-opensearch` to compatible versions. Pin `opensearch-py` if specific version is required by your OpenSearch cluster.
gotcha SSL/TLS certificate verification errors are common when connecting to OpenSearch clusters using self-signed certificates or when the certificate chain is not properly trusted by the Airflow worker environment. ↓
fix For development, you can set `verify_ssl=False` in the connection extra settings (though not recommended for production). For production, ensure the necessary CA certificates are installed on the Airflow worker machines or provide the certificate path in the connection configuration.
gotcha The OpenSearch provider was forked from the Elasticsearch provider. While largely compatible, be aware of specific OpenSearch features or API differences. Direct migration from `apache-airflow-providers-elasticsearch` might require minor adjustments. ↓
fix Thoroughly review your DAGs and connection configurations when migrating. Consult OpenSearch documentation for any API changes compared to Elasticsearch.
Imports
- OpenSearchHook
from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook - OpenSearchOperator
from airflow.providers.opensearch.operators.opensearch import OpenSearchOperator - OpenSearchIngestOperator wrong
from airflow.providers.opensearch.operators.opensearch import OpenSearchIngestOperatorcorrectfrom airflow.providers.opensearch.operators.opensearch_ingest import OpenSearchIngestOperator - OpenSearchIndexSensor
from airflow.providers.opensearch.sensors.opensearch import OpenSearchIndexSensor
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.opensearch.operators.opensearch import OpenSearchOperator
with DAG(
dag_id="opensearch_example_dag",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["opensearch", "example"],
) as dag:
# Configure an OpenSearch connection in Airflow UI (Admin -> Connections)
# with conn_id='opensearch_default'. Example:
# Conn Id: opensearch_default
# Conn Type: OpenSearch
# Host: http://localhost
# Port: 9200
# Scheme: http (or https)
# Verify SSL: False (for local dev, use True in prod)
check_opensearch_health = OpenSearchOperator(
task_id="check_opensearch_health",
conn_id="opensearch_default",
endpoint="_cluster/health",
method="GET",
log_response=True,
)
# You can also perform data ingestion
# ingest_document = OpenSearchOperator(
# task_id="ingest_example_document",
# conn_id="opensearch_default",
# endpoint="my_index/_doc/1",
# method="POST",
# data={"field": "value"},
# headers={"Content-Type": "application/json"},
# log_response=True,
# )
# check_opensearch_health >> ingest_document