OpenAI Provider for Apache Airflow

1.7.4 · active · verified Thu Apr 16

The `apache-airflow-providers-openai` package enables seamless integration between Apache Airflow and OpenAI APIs. It provides a set of hooks, operators, and triggers to orchestrate AI-powered workflows such as text generation, embeddings, and batch API calls directly within Airflow DAGs. The current version is 1.7.4 and it typically follows the release cadence of Apache Airflow providers, with frequent updates to support new OpenAI features and Airflow versions.

Common errors

Warnings

Install

Imports

Quickstart

This DAG demonstrates a simple chat completion using the `OpenAIHook`. It retrieves the OpenAI API key from an Airflow connection named `openai_default`. The `ask_chatgpt` task sends a prompt to a specified OpenAI model and returns the response. The `print_response` task then prints this output.

import os
from pendulum import datetime
from airflow.decorators import dag, task
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.models.connection import Connection

# Set OpenAI API key in Airflow Connection 'openai_default' or as an environment variable OPENAI_API_KEY
# For local testing, you can set it as an environment variable for the run command:
# export AIRFLOW_CONN_OPENAI_DEFAULT='{"conn_type": "openai", "password": "<your-openai-api-key>"}'

@dag(
    start_date=datetime(2023, 10, 26),
    schedule=None,
    catchup=False,
    tags=['openai', 'example'],
)
def openai_chat_completion_dag():
    @task
    def ask_chatgpt(prompt: str, model: str = 'gpt-3.5-turbo'):
        hook = OpenAIHook(conn_id='openai_default')
        response = hook.create_chat_completion(
            messages=[{"role": "user", "content": prompt}],
            model=model
        )
        return response.choices[0].message.content

    @task
    def print_response(response: str):
        print(f"ChatGPT's response: {response}")

    question = ask_chatgpt(prompt="What is the capital of France?")
    print_response(response=question)

openai_chat_completion_dag()

view raw JSON →