OpenAI Provider for Apache Airflow
The `apache-airflow-providers-openai` package enables seamless integration between Apache Airflow and OpenAI APIs. It provides a set of hooks, operators, and triggers to orchestrate AI-powered workflows such as text generation, embeddings, and batch API calls directly within Airflow DAGs. The current version is 1.7.4 and it typically follows the release cadence of Apache Airflow providers, with frequent updates to support new OpenAI features and Airflow versions.
Common errors
-
TypeError: 'OpenAIHook' object is not callable
cause Attempting to call the `OpenAIHook` instance directly as a function, or using an outdated API method from the `openai` library.fixEnsure you are calling the correct method on the `OpenAIHook` instance (e.g., `hook.create_chat_completion`, `hook.create_embeddings`). If updating from an older `openai` client, method signatures may have changed. -
airflow.exceptions.AirflowException: The conn_id `openai_default` isn't defined.
cause The Airflow connection named `openai_default` (or whichever `conn_id` is specified) has not been configured in your Airflow environment.fixCreate an Airflow connection with `conn_id='openai_default'` (or your chosen ID) via the Airflow UI, environment variables (e.g., `AIRFLOW_CONN_OPENAI_DEFAULT='{"conn_type": "openai", "password": "<YOUR_API_KEY>"}'`), or a local `connections.py` file if applicable. The `conn_type` should be `openai` and the API key should be stored in the 'Password' field or as a JSON extra. -
ImportError: cannot import name 'BaseHook' from 'airflow.hooks.base'
cause This typically occurs in Airflow 3.x environments where `BaseHook` and `BaseOperator` have been moved to `airflow.sdk` instead of `airflow.hooks.base` and `airflow.models` respectively.fixIf writing custom operators/hooks, update your imports to use `from airflow.sdk import BaseHook, BaseOperator`. For existing provider code, ensure you are using a provider version compatible with your Airflow version, as the provider itself handles these compatibility imports internally.
Warnings
- breaking The `openai` Python client library underwent a major rewrite with version 1.0.0. Provider versions prior to 1.1.0 (specifically from version 1.1.0 onwards) are updated to be compatible with `openai>=1.0.0`. If you are using an older provider version with `openai>=1.0.0`, you will encounter import and API errors.
- breaking Airflow 3.0 introduced significant changes to core components, including `BaseHook` and `BaseOperator` imports, moving them to `airflow.sdk`. Examples and code snippets designed for Airflow 2.x may require updates to import paths for Airflow 3.x compatibility.
- gotcha A new provider, `apache-airflow-providers-common-ai`, was released (version 0.1.0, requiring Airflow 3.0+), offering a unified interface for various LLMs, including OpenAI, with dedicated LLM and agent operators. For Airflow 3.x users, this might become the preferred method for OpenAI integration, potentially leading to a deprecation or reduced feature development in `apache-airflow-providers-openai` for new AI functionalities.
Install
-
pip install apache-airflow-providers-openai -
pip install 'apache-airflow-providers-openai[common.compat]' # For cross-provider compatibility
Imports
- OpenAIHook
from airflow.providers.openai.hooks.openai import OpenAIHook
- OpenAIEmbeddingOperator
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator
- OpenAITriggerBatchOperator
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
- OpenAIBatchTrigger
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
Quickstart
import os
from pendulum import datetime
from airflow.decorators import dag, task
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.models.connection import Connection
# Set OpenAI API key in Airflow Connection 'openai_default' or as an environment variable OPENAI_API_KEY
# For local testing, you can set it as an environment variable for the run command:
# export AIRFLOW_CONN_OPENAI_DEFAULT='{"conn_type": "openai", "password": "<your-openai-api-key>"}'
@dag(
start_date=datetime(2023, 10, 26),
schedule=None,
catchup=False,
tags=['openai', 'example'],
)
def openai_chat_completion_dag():
@task
def ask_chatgpt(prompt: str, model: str = 'gpt-3.5-turbo'):
hook = OpenAIHook(conn_id='openai_default')
response = hook.create_chat_completion(
messages=[{"role": "user", "content": prompt}],
model=model
)
return response.choices[0].message.content
@task
def print_response(response: str):
print(f"ChatGPT's response: {response}")
question = ask_chatgpt(prompt="What is the capital of France?")
print_response(response=question)
openai_chat_completion_dag()