Airflow ClickHouse Plugin

1.6.0 · active · verified Thu Apr 16

The `airflow-clickhouse-plugin` provides Apache Airflow operators, hooks, and sensors for interacting with ClickHouse databases. It supports executing DDL/DML commands and queries. The current version is `1.6.0`, and releases are typically made to align with new Apache Airflow major and minor versions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Airflow DAG that uses the `ClickHouseOperator` to create a table, insert data, and query data in a ClickHouse database. Before running, configure an Airflow connection of type 'ClickHouse' (often named `clickhouse_default`) with appropriate host, port, user, password, and database details for your ClickHouse instance.

import os
from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator

# Ensure a ClickHouse connection named 'clickhouse_default' is configured in Airflow.
# Example Extra field JSON for ClickHouse connection (type 'ClickHouse'):
# {"host": "localhost", "port": 8123, "user": "default", "password": "", "database": "default"}

with DAG(
    dag_id='clickhouse_quickstart_dag',
    start_date=days_ago(1),
    schedule_interval=None,
    tags=['clickhouse', 'example'],
    catchup=False
) as dag:
    create_table = ClickHouseOperator(
        task_id='create_example_table',
        database='default', # Or specify a different database
        sql="""
        CREATE TABLE IF NOT EXISTS my_test_table (
            id UInt64,
            name String
        ) ENGINE = MergeTree()
        ORDER BY id;
        """,
        clickhouse_conn_id='clickhouse_default',
    )

    insert_data = ClickHouseOperator(
        task_id='insert_example_data',
        database='default',
        sql="INSERT INTO my_test_table VALUES (1, 'Alice'), (2, 'Bob');",
        clickhouse_conn_id='clickhouse_default',
    )

    query_data = ClickHouseOperator(
        task_id='select_example_data',
        database='default',
        sql="SELECT * FROM my_test_table;",
        clickhouse_conn_id='clickhouse_default'
        # Note: ClickHouseOperator executes queries; to retrieve results,
        # you typically use a PythonOperator with ClickHouseHook or a sensor.
    )

    create_table >> insert_data >> query_data

view raw JSON →