Airflow ClickHouse Plugin
raw JSON → 1.6.0 verified Thu Apr 16 auth: no python
The `airflow-clickhouse-plugin` provides Apache Airflow operators, hooks, and sensors for interacting with ClickHouse databases. It supports executing DDL/DML commands and queries. The current version is `1.6.0`, and releases are typically made to align with new Apache Airflow major and minor versions.
pip install airflow-clickhouse-plugin Common errors
error ModuleNotFoundError: No module named 'airflow_clickhouse_plugin' ↓
cause The `airflow-clickhouse-plugin` Python package is not installed in the Airflow environment, or the Airflow scheduler/worker restarted without the plugin being properly loaded.
fix
Install the plugin using
pip install airflow-clickhouse-plugin in the same Python environment where Airflow is running. Restart Airflow components (scheduler, webserver, workers) to ensure the plugin is loaded. error airflow.exceptions.AirflowException: The hook for connection type 'clickhouse' is not available. ↓
cause Airflow failed to register the ClickHouse connection type, usually because the plugin was not loaded correctly or there's a typo in the connection type being referenced.
fix
Ensure
airflow-clickhouse-plugin is correctly installed and that Airflow components have been restarted after installation. Double-check that you are using 'ClickHouse' as the connection type in the Airflow UI, not 'clickhouse' (case might matter depending on Airflow version). error Code: 210, e.displayText() = DB::Exception: Connection refused ↓
cause The ClickHouse server is not running, is not accessible from the Airflow worker, or the host/port in the Airflow connection are incorrect.
fix
Verify that your ClickHouse server is running and accessible from the machine where your Airflow worker is executing tasks. Check the
clickhouse_conn_id configuration in Airflow for correct host, port, and security settings. error airflow.exceptions.AirflowException: Missing connection id hook_type: clickhouse_conn_id, task_id: my_task ↓
cause The `clickhouse_conn_id` parameter was not provided to the `ClickHouseOperator` or `ClickHouseSqlOperator`, or its value is empty.
fix
Ensure that
clickhouse_conn_id='your_connection_id' is explicitly passed to the operator, and that 'your_connection_id' corresponds to a valid ClickHouse connection configured in Airflow. Warnings
breaking Version 1.0.0 introduced significant refactoring and two distinct operator families (`ClickHouseOperator` and `ClickHouseSqlOperator`). Code written for pre-1.0.0 versions will likely require updates to import paths and operator parameters. ↓
fix Review the changelog for v1.0.0 and update import paths, operator names, and parameters to align with the new structure. Decide which operator (`ClickHouseOperator` or `ClickHouseSqlOperator`) is best suited for your use case.
gotcha The plugin has explicit compatibility with specific Airflow versions. Using it with an unsupported or significantly different Airflow version (e.g., a newer major version not yet listed as supported) can lead to runtime errors or unexpected behavior. ↓
fix Always check the plugin's GitHub releases or PyPI page for the list of supported Apache Airflow versions. Ensure your Airflow installation matches one of the supported versions or upgrade the plugin if a newer Airflow version is supported.
gotcha There are two main operator families: `ClickHouseOperator` (based on `clickhouse-driver.Client.execute`) and `ClickHouseSqlOperator` (based on `airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`). They might have subtle differences in behavior or supported SQL features. ↓
fix Understand the distinction between the two operators. `ClickHouseOperator` offers more direct integration with `clickhouse-driver`, while `ClickHouseSqlOperator` leverages Airflow's common SQL provider, which might be more familiar to users of other SQL database providers. Choose the one that best fits your specific requirements or existing patterns.
gotcha Incorrect Airflow connection configuration (wrong host, port, credentials, or database) will prevent the operators from connecting to ClickHouse, leading to task failures. ↓
fix Verify your Airflow connection details for the ClickHouse connection ID used by your operators (e.g., `clickhouse_default`). Ensure the connection type is 'ClickHouse' and all parameters (host, port, user, password, database, security settings) are correct and accessible from the Airflow worker.
Imports
- ClickHouseHook wrong
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHookcorrectfrom airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook - ClickHouseOperator wrong
from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperatorcorrectfrom airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator - ClickHouseSqlOperator
from airflow_clickhouse_plugin.operators.clickhouse_sql import ClickHouseSqlOperator - ClickHouseSensor wrong
from airflow.providers.clickhouse.sensors.clickhouse import ClickHouseSensorcorrectfrom airflow_clickhouse_plugin.sensors.clickhouse import ClickHouseSensor
Quickstart
import os
from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
# Ensure a ClickHouse connection named 'clickhouse_default' is configured in Airflow.
# Example Extra field JSON for ClickHouse connection (type 'ClickHouse'):
# {"host": "localhost", "port": 8123, "user": "default", "password": "", "database": "default"}
with DAG(
dag_id='clickhouse_quickstart_dag',
start_date=days_ago(1),
schedule_interval=None,
tags=['clickhouse', 'example'],
catchup=False
) as dag:
create_table = ClickHouseOperator(
task_id='create_example_table',
database='default', # Or specify a different database
sql="""
CREATE TABLE IF NOT EXISTS my_test_table (
id UInt64,
name String
) ENGINE = MergeTree()
ORDER BY id;
""",
clickhouse_conn_id='clickhouse_default',
)
insert_data = ClickHouseOperator(
task_id='insert_example_data',
database='default',
sql="INSERT INTO my_test_table VALUES (1, 'Alice'), (2, 'Bob');",
clickhouse_conn_id='clickhouse_default',
)
query_data = ClickHouseOperator(
task_id='select_example_data',
database='default',
sql="SELECT * FROM my_test_table;",
clickhouse_conn_id='clickhouse_default'
# Note: ClickHouseOperator executes queries; to retrieve results,
# you typically use a PythonOperator with ClickHouseHook or a sensor.
)
create_table >> insert_data >> query_data