Apache Airflow Provider for Apache Iceberg

raw JSON →
2.0.2 verified Fri May 01 auth: no python

Apache Airflow provider for Apache Iceberg, enabling interaction with Iceberg tables via Airflow. Current version 2.0.2 requires Python >=3.10 and Airflow >=2.10.0. Released monthly alongside other providers.

pip install apache-airflow-providers-apache-iceberg
error Broken DAG: No module named 'airflow.providers.apache.iceberg.operators'
cause Provider not installed or wrong import path.
fix
Install provider: pip install apache-airflow-providers-apache-iceberg. Use correct import: from airflow.providers.apache.iceberg.operators.iceberg import IcebergOperator.
error AirflowException: The conn_id `iceberg_default` is not defined
cause Missing Airflow connection for Iceberg.
fix
Create an Iceberg connection in Airflow UI (Admin -> Connections) with Conn Id = iceberg_default, Conn Type = Iceberg, and appropriate extras (e.g., URI, catalog).
breaking In version 2.0.0, the `conn_id` parameter is now required and must reference a connection configured for Iceberg. Previously, some operators defaulted to a generic connection.
fix Ensure an Iceberg connection (e.g., type 'Iceberg') is created in Airflow and reference its `conn_id` in all operators.
gotcha The operator `IcebergOperator` is not a direct replacement for Spark or Flink SQL on Iceberg. It sends SQL queries to a configured catalog (e.g., via PyIceberg), not to a compute engine.
fix Use `IcebergOperator` only for catalog-level operations (DDL, simple DML). For heavy transformations, use SparkSubmitOperator or similar.
deprecated The hook class was renamed from `IcebergHook` to `IcebergHook` (remains same), but the old import path `airflow.providers.apache.iceberg.hooks.iceberg_hook` is deprecated since 1.1.0.
fix Use `from airflow.providers.apache.iceberg.hooks.iceberg import IcebergHook`.

Define a DAG that appends data to an Iceberg table using an IcebergOperator.

from datetime import datetime
from airflow import DAG
from airflow.providers.apache.iceberg.operators.iceberg import IcebergOperator

dag = DAG(
    'iceberg_quickstart',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
)

with dag:
    append_data = IcebergOperator(
        task_id='append_to_table',
        conn_id='iceberg_default',
        catalog_name='default',
        namespace='default',
        table='my_table',
        sql='INSERT INTO my_table VALUES (1, "test")',
    )