Apache Airflow Druid Provider
The Apache Airflow Druid Provider enables Airflow to interact with Apache Druid, a high-performance, real-time analytics database. It includes hooks and operators for executing Druid queries and loading data. The current version is 4.5.2, and Airflow providers generally follow Airflow's release cadence, with updates for new features and bug fixes.
Common errors
-
ModuleNotFoundError: No module named 'airflow.contrib.operators.druid_operator'
cause Using an old import path from Airflow 1.x or before the provider was separated.fixInstall the provider (`pip install apache-airflow-providers-apache-druid`) and update your import statement to `from airflow.providers.apache.druid.operators.druid import DruidOperator`. -
AirflowException: The Druid connection with conn_id 'my_druid_conn' is not found.
cause The specified `druid_conn_id` does not exist in your Airflow connections.fixCreate an Airflow connection with the matching `conn_id` (e.g., 'my_druid_conn') and 'Conn Type' set to 'Druid' via the Airflow UI (Admin -> Connections) or CLI. -
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
cause The `json_query` parameter in `DruidOperator` was provided with an invalid JSON string or object.fixEnsure the `json_query` argument is a valid Python dictionary representing the Druid native query structure, or a properly formatted JSON string. If using `sql_query`, do not provide `json_query`.
Warnings
- breaking Migration from Airflow 1.x to 2.x requires installing providers separately and updating import paths.
- gotcha DruidOperator supports both `sql_query` and `json_query`. Ensure you use only one and provide the correct query format.
- gotcha The `druid_conn_id` must reference a correctly configured Airflow connection of type 'Druid'. Misconfigured connections lead to errors.
Install
-
pip install apache-airflow-providers-apache-druid
Imports
- DruidHook
from airflow.contrib.hooks.druid_hook import DruidHook
from airflow.providers.apache.druid.hooks.druid import DruidHook
- DruidOperator
from airflow.contrib.operators.druid_operator import DruidOperator
from airflow.providers.apache.druid.operators.druid import DruidOperator
- DruidToS3Operator
from airflow.providers.apache.druid.operators.druid_to_s3 import DruidToS3Operator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.apache.druid.operators.druid import DruidOperator
# Ensure you have an Airflow connection named 'druid_default'
# Or set one up: airflow connections add --conn-id druid_default --conn-type Druid --conn-host localhost --conn-port 8082
# For local testing, Druid usually runs on localhost:8082 by default
DRUID_CONN_ID = os.environ.get('AIRFLOW_DRUID_CONN_ID', 'druid_default')
with DAG(
dag_id='druid_example_dag',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['druid', 'example'],
) as dag:
# Example of a simple Druid SQL query
run_druid_sql_query = DruidOperator(
task_id='run_druid_sql_query',
druid_conn_id=DRUID_CONN_ID,
sql_query="SELECT COUNT(*) FROM wikipedia WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY",
)
# Example of a more complex Druid JSON query
# This assumes a 'wikipedia' datasource exists
run_druid_json_query = DruidOperator(
task_id='run_druid_json_query',
druid_conn_id=DRUID_CONN_ID,
json_query={
"queryType": "timeseries",
"dataSource": "wikipedia",
"granularity": "day",
"intervals": ["2016-06-01T00:00:00.000Z/2016-06-02T00:00:00.000Z"],
"aggregations": [
{"type": "count", "name": "total_events"}
]
},
)
run_druid_sql_query >> run_druid_json_query