Apache Airflow Impala Provider

1.9.1 · active · verified Sat Apr 11

The Apache Airflow Impala Provider integrates Apache Airflow with Apache Impala, allowing users to programmatically author, schedule, and monitor workflows that interact with Impala databases. It provides hooks and operators to execute SQL queries and manage Impala connections within Airflow DAGs. The current version is 1.9.1, released on 2026-03-28, and follows the release cadence of other Airflow providers, typically updated every few months.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple Airflow DAG that uses the `SQLExecuteQueryOperator` to interact with an Apache Impala database. It includes tasks for creating a table, inserting data, selecting data, and dropping the table. Before running, ensure you have configured an Impala connection in Airflow with the ID `my_impala_conn`.

from __future__ import annotations
import datetime
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# Ensure you have an Airflow connection named 'my_impala_conn'
# with appropriate Impala host, port (default 21050), and credentials.
# Example Extra JSON: {'auth_mechanism': 'NOSASL'}

with DAG(
    dag_id="example_impala_dag",
    start_date=datetime.datetime(2023, 1, 1),
    default_args={
        "conn_id": "my_impala_conn", # Airflow connection ID for Impala
        "owner": "airflow"
    },
    schedule="@once",
    catchup=False,
    tags=["impala", "sql"],
) as dag:
    create_table_task = SQLExecuteQueryOperator(
        task_id="create_impala_table",
        sql="""CREATE TABLE IF NOT EXISTS my_impala_table (id INT, name STRING)"""
    )

    insert_data_task = SQLExecuteQueryOperator(
        task_id="insert_impala_data",
        sql="""INSERT INTO my_impala_table VALUES (1, 'Alice'), (2, 'Bob')"""
    )

    select_data_task = SQLExecuteQueryOperator(
        task_id="select_impala_data",
        sql="""SELECT COUNT(*) FROM my_impala_table""",
        handler=lambda x: print(f"Row count: {x[0][0]}")
    )

    drop_table_task = SQLExecuteQueryOperator(
        task_id="drop_impala_table",
        sql="""DROP TABLE IF EXISTS my_impala_table"""
    )

    (create_table_task >> insert_data_task >> select_data_task >> drop_table_task)

view raw JSON →