OpenAI Provider for Apache Airflow
raw JSON → 1.7.4 verified Thu Apr 16 auth: no python
The `apache-airflow-providers-openai` package enables seamless integration between Apache Airflow and OpenAI APIs. It provides a set of hooks, operators, and triggers to orchestrate AI-powered workflows such as text generation, embeddings, and batch API calls directly within Airflow DAGs. The current version is 1.7.4 and it typically follows the release cadence of Apache Airflow providers, with frequent updates to support new OpenAI features and Airflow versions.
pip install apache-airflow-providers-openai Common errors
error TypeError: 'OpenAIHook' object is not callable ↓
cause Attempting to call the `OpenAIHook` instance directly as a function, or using an outdated API method from the `openai` library.
fix
Ensure you are calling the correct method on the
OpenAIHook instance (e.g., hook.create_chat_completion, hook.create_embeddings). If updating from an older openai client, method signatures may have changed. error airflow.exceptions.AirflowException: The conn_id `openai_default` isn't defined. ↓
cause The Airflow connection named `openai_default` (or whichever `conn_id` is specified) has not been configured in your Airflow environment.
fix
Create an Airflow connection with
conn_id='openai_default' (or your chosen ID) via the Airflow UI, environment variables (e.g., AIRFLOW_CONN_OPENAI_DEFAULT='{"conn_type": "openai", "password": "<YOUR_API_KEY>"}'), or a local connections.py file if applicable. The conn_type should be openai and the API key should be stored in the 'Password' field or as a JSON extra. error ImportError: cannot import name 'BaseHook' from 'airflow.hooks.base' ↓
cause This typically occurs in Airflow 3.x environments where `BaseHook` and `BaseOperator` have been moved to `airflow.sdk` instead of `airflow.hooks.base` and `airflow.models` respectively.
fix
If writing custom operators/hooks, update your imports to use
from airflow.sdk import BaseHook, BaseOperator. For existing provider code, ensure you are using a provider version compatible with your Airflow version, as the provider itself handles these compatibility imports internally. Warnings
breaking The `openai` Python client library underwent a major rewrite with version 1.0.0. Provider versions prior to 1.1.0 (specifically from version 1.1.0 onwards) are updated to be compatible with `openai>=1.0.0`. If you are using an older provider version with `openai>=1.0.0`, you will encounter import and API errors. ↓
fix Upgrade `apache-airflow-providers-openai` to version 1.1.0 or newer. Ensure your `openai` library version is compatible with your provider version (e.g., `openai>=1.66.0` for provider 1.7.4).
breaking Airflow 3.0 introduced significant changes to core components, including `BaseHook` and `BaseOperator` imports, moving them to `airflow.sdk`. Examples and code snippets designed for Airflow 2.x may require updates to import paths for Airflow 3.x compatibility. ↓
fix Refer to the provider's documentation and Airflow's migration guides for specific import path adjustments for Airflow 3.x. The provider aims for compatibility across versions using internal shims (e.g., `airflow.providers.common.compat.sdk`), but direct usage of `airflow.hooks.base` or `airflow.models` might break.
gotcha A new provider, `apache-airflow-providers-common-ai`, was released (version 0.1.0, requiring Airflow 3.0+), offering a unified interface for various LLMs, including OpenAI, with dedicated LLM and agent operators. For Airflow 3.x users, this might become the preferred method for OpenAI integration, potentially leading to a deprecation or reduced feature development in `apache-airflow-providers-openai` for new AI functionalities. ↓
fix For new projects on Airflow 3.x, consider evaluating `apache-airflow-providers-common-ai` as an alternative for general LLM and AI agent integration. For existing `apache-airflow-providers-openai` users on Airflow 3.x, be aware of potential future shifts in recommended practices.
Install
pip install 'apache-airflow-providers-openai[common.compat]' # For cross-provider compatibility Imports
- OpenAIHook
from airflow.providers.openai.hooks.openai import OpenAIHook - OpenAIEmbeddingOperator
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator - OpenAITriggerBatchOperator
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator - OpenAIBatchTrigger
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
Quickstart
import os
from pendulum import datetime
from airflow.decorators import dag, task
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.models.connection import Connection
# Set OpenAI API key in Airflow Connection 'openai_default' or as an environment variable OPENAI_API_KEY
# For local testing, you can set it as an environment variable for the run command:
# export AIRFLOW_CONN_OPENAI_DEFAULT='{"conn_type": "openai", "password": "<your-openai-api-key>"}'
@dag(
start_date=datetime(2023, 10, 26),
schedule=None,
catchup=False,
tags=['openai', 'example'],
)
def openai_chat_completion_dag():
@task
def ask_chatgpt(prompt: str, model: str = 'gpt-3.5-turbo'):
hook = OpenAIHook(conn_id='openai_default')
response = hook.create_chat_completion(
messages=[{"role": "user", "content": prompt}],
model=model
)
return response.choices[0].message.content
@task
def print_response(response: str):
print(f"ChatGPT's response: {response}")
question = ask_chatgpt(prompt="What is the capital of France?")
print_response(response=question)
openai_chat_completion_dag()