Airflow Provider for DuckDB

raw JSON →
0.2.0 verified Sat May 09 auth: no python

An Apache Airflow provider that integrates DuckDB, enabling SQL execution, data loading, and transformations within Airflow DAGs. Current version 0.2.0 targets DuckDB 0.3.x+ and Airflow 2.x. Release cadence is irregular.

pip install airflow-provider-duckdb
error ModuleNotFoundError: No module named 'airflow_provider_duckdb'
cause Incorrect import path using underscore instead of dotted path.
fix
Use from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
error AirflowException: The conn_id `duckdb_default` isn't defined
cause DuckDB connection not configured in Airflow.
fix
Add connection via Airflow UI (Admin -> Connections) or set env var: AIRFLOW_CONN_DUCKDB_DEFAULT='duckdb://localhost:5432/mydb'
error duckdb.CatalogException: Catalog Error: Table with name test already exists
cause Running DAG multiple times without CREATE TABLE IF NOT EXISTS or handling idempotency.
fix
Use CREATE TABLE IF NOT EXISTS or wrap in DROP TABLE IF EXISTS before creation.
breaking Operator `sql` parameter expects a single SQL string; multiple statements must be separated by semicolons (DuckDB multi-statement support is limited)
fix Combine statements in one string with semicolons or use multiple tasks.
gotcha Connection ID 'duckdb_default' is not automatically created; you must configure it in Airflow UI or environment. Missing connection leads to cryptic Airflow errors.
fix Set environment variable AIRFLOW_CONN_DUCKDB_DEFAULT='duckdb://localhost:5432/mydb' or configure via UI.
deprecated Old import path `from airflow_provider_duckdb import ...` is deprecated; always use `from airflow.providers.duckdb import ...`
fix Use dotted provider import path as shown in quickstart above.

Minimal DAG creating a table, inserting rows, and selecting data with DuckDBOperator.

from datetime import datetime
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator

default_args = {'owner': 'airflow', 'start_date': datetime(2023, 1, 1)}

with DAG('duckdb_example', schedule_interval=None, default_args=default_args, catchup=False) as dag:
    create_table = DuckDBOperator(
        task_id='create_table',
        sql="CREATE TABLE IF NOT EXISTS test (id INTEGER, name VARCHAR)",
        duckdb_conn_id='duckdb_default'
    )
    insert_data = DuckDBOperator(
        task_id='insert_data',
        sql="INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')",
        duckdb_conn_id='duckdb_default'
    )
    select_data = DuckDBOperator(
        task_id='select_data',
        sql="SELECT * FROM test",
        duckdb_conn_id='duckdb_default',
        do_xcom_push=True
    )
    create_table >> insert_data >> select_data