Airflow ClickHouse Plugin
The `airflow-clickhouse-plugin` provides Apache Airflow operators, hooks, and sensors for interacting with ClickHouse databases. It supports executing DDL/DML commands and queries. The current version is `1.6.0`, and releases are typically made to align with new Apache Airflow major and minor versions.
Common errors
-
ModuleNotFoundError: No module named 'airflow_clickhouse_plugin'
cause The `airflow-clickhouse-plugin` Python package is not installed in the Airflow environment, or the Airflow scheduler/worker restarted without the plugin being properly loaded.fixInstall the plugin using `pip install airflow-clickhouse-plugin` in the same Python environment where Airflow is running. Restart Airflow components (scheduler, webserver, workers) to ensure the plugin is loaded. -
airflow.exceptions.AirflowException: The hook for connection type 'clickhouse' is not available.
cause Airflow failed to register the ClickHouse connection type, usually because the plugin was not loaded correctly or there's a typo in the connection type being referenced.fixEnsure `airflow-clickhouse-plugin` is correctly installed and that Airflow components have been restarted after installation. Double-check that you are using 'ClickHouse' as the connection type in the Airflow UI, not 'clickhouse' (case might matter depending on Airflow version). -
Code: 210, e.displayText() = DB::Exception: Connection refused
cause The ClickHouse server is not running, is not accessible from the Airflow worker, or the host/port in the Airflow connection are incorrect.fixVerify that your ClickHouse server is running and accessible from the machine where your Airflow worker is executing tasks. Check the `clickhouse_conn_id` configuration in Airflow for correct host, port, and security settings. -
airflow.exceptions.AirflowException: Missing connection id hook_type: clickhouse_conn_id, task_id: my_task
cause The `clickhouse_conn_id` parameter was not provided to the `ClickHouseOperator` or `ClickHouseSqlOperator`, or its value is empty.fixEnsure that `clickhouse_conn_id='your_connection_id'` is explicitly passed to the operator, and that 'your_connection_id' corresponds to a valid ClickHouse connection configured in Airflow.
Warnings
- breaking Version 1.0.0 introduced significant refactoring and two distinct operator families (`ClickHouseOperator` and `ClickHouseSqlOperator`). Code written for pre-1.0.0 versions will likely require updates to import paths and operator parameters.
- gotcha The plugin has explicit compatibility with specific Airflow versions. Using it with an unsupported or significantly different Airflow version (e.g., a newer major version not yet listed as supported) can lead to runtime errors or unexpected behavior.
- gotcha There are two main operator families: `ClickHouseOperator` (based on `clickhouse-driver.Client.execute`) and `ClickHouseSqlOperator` (based on `airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`). They might have subtle differences in behavior or supported SQL features.
- gotcha Incorrect Airflow connection configuration (wrong host, port, credentials, or database) will prevent the operators from connecting to ClickHouse, leading to task failures.
Install
-
pip install airflow-clickhouse-plugin
Imports
- ClickHouseHook
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHook
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
- ClickHouseOperator
from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
- ClickHouseSqlOperator
from airflow_clickhouse_plugin.operators.clickhouse_sql import ClickHouseSqlOperator
- ClickHouseSensor
from airflow.providers.clickhouse.sensors.clickhouse import ClickHouseSensor
from airflow_clickhouse_plugin.sensors.clickhouse import ClickHouseSensor
Quickstart
import os
from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
# Ensure a ClickHouse connection named 'clickhouse_default' is configured in Airflow.
# Example Extra field JSON for ClickHouse connection (type 'ClickHouse'):
# {"host": "localhost", "port": 8123, "user": "default", "password": "", "database": "default"}
with DAG(
dag_id='clickhouse_quickstart_dag',
start_date=days_ago(1),
schedule_interval=None,
tags=['clickhouse', 'example'],
catchup=False
) as dag:
create_table = ClickHouseOperator(
task_id='create_example_table',
database='default', # Or specify a different database
sql="""
CREATE TABLE IF NOT EXISTS my_test_table (
id UInt64,
name String
) ENGINE = MergeTree()
ORDER BY id;
""",
clickhouse_conn_id='clickhouse_default',
)
insert_data = ClickHouseOperator(
task_id='insert_example_data',
database='default',
sql="INSERT INTO my_test_table VALUES (1, 'Alice'), (2, 'Bob');",
clickhouse_conn_id='clickhouse_default',
)
query_data = ClickHouseOperator(
task_id='select_example_data',
database='default',
sql="SELECT * FROM my_test_table;",
clickhouse_conn_id='clickhouse_default'
# Note: ClickHouseOperator executes queries; to retrieve results,
# you typically use a PythonOperator with ClickHouseHook or a sensor.
)
create_table >> insert_data >> query_data