Apache Airflow Druid Provider
raw JSON → 4.5.2 verified Thu Apr 16 auth: no python
The Apache Airflow Druid Provider enables Airflow to interact with Apache Druid, a high-performance, real-time analytics database. It includes hooks and operators for executing Druid queries and loading data. The current version is 4.5.2, and Airflow providers generally follow Airflow's release cadence, with updates for new features and bug fixes.
pip install apache-airflow-providers-apache-druid Common errors
error ModuleNotFoundError: No module named 'airflow.contrib.operators.druid_operator' ↓
cause Using an old import path from Airflow 1.x or before the provider was separated.
fix
Install the provider (
pip install apache-airflow-providers-apache-druid) and update your import statement to from airflow.providers.apache.druid.operators.druid import DruidOperator. error AirflowException: The Druid connection with conn_id 'my_druid_conn' is not found. ↓
cause The specified `druid_conn_id` does not exist in your Airflow connections.
fix
Create an Airflow connection with the matching
conn_id (e.g., 'my_druid_conn') and 'Conn Type' set to 'Druid' via the Airflow UI (Admin -> Connections) or CLI. error json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) ↓
cause The `json_query` parameter in `DruidOperator` was provided with an invalid JSON string or object.
fix
Ensure the
json_query argument is a valid Python dictionary representing the Druid native query structure, or a properly formatted JSON string. If using sql_query, do not provide json_query. Warnings
breaking Migration from Airflow 1.x to 2.x requires installing providers separately and updating import paths. ↓
fix Install `apache-airflow-providers-apache-druid` explicitly (`pip install ...`) and update all imports from `airflow.contrib.*` to `airflow.providers.apache.druid.*`.
gotcha DruidOperator supports both `sql_query` and `json_query`. Ensure you use only one and provide the correct query format. ↓
fix Use either `sql_query='SELECT ...'` for SQL or `json_query={...}` for native Druid queries, but not both simultaneously. Validate your JSON query structure against Druid's API documentation.
gotcha The `druid_conn_id` must reference a correctly configured Airflow connection of type 'Druid'. Misconfigured connections lead to errors. ↓
fix Verify your Airflow connection: `Admin -> Connections`. Ensure 'Conn Id' matches `druid_conn_id`, 'Conn Type' is 'Druid', and 'Host' and 'Port' are correct for your Druid router/broker.
Imports
- DruidHook wrong
from airflow.contrib.hooks.druid_hook import DruidHookcorrectfrom airflow.providers.apache.druid.hooks.druid import DruidHook - DruidOperator wrong
from airflow.contrib.operators.druid_operator import DruidOperatorcorrectfrom airflow.providers.apache.druid.operators.druid import DruidOperator - DruidToS3Operator
from airflow.providers.apache.druid.operators.druid_to_s3 import DruidToS3Operator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.apache.druid.operators.druid import DruidOperator
# Ensure you have an Airflow connection named 'druid_default'
# Or set one up: airflow connections add --conn-id druid_default --conn-type Druid --conn-host localhost --conn-port 8082
# For local testing, Druid usually runs on localhost:8082 by default
DRUID_CONN_ID = os.environ.get('AIRFLOW_DRUID_CONN_ID', 'druid_default')
with DAG(
dag_id='druid_example_dag',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['druid', 'example'],
) as dag:
# Example of a simple Druid SQL query
run_druid_sql_query = DruidOperator(
task_id='run_druid_sql_query',
druid_conn_id=DRUID_CONN_ID,
sql_query="SELECT COUNT(*) FROM wikipedia WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY",
)
# Example of a more complex Druid JSON query
# This assumes a 'wikipedia' datasource exists
run_druid_json_query = DruidOperator(
task_id='run_druid_json_query',
druid_conn_id=DRUID_CONN_ID,
json_query={
"queryType": "timeseries",
"dataSource": "wikipedia",
"granularity": "day",
"intervals": ["2016-06-01T00:00:00.000Z/2016-06-02T00:00:00.000Z"],
"aggregations": [
{"type": "count", "name": "total_events"}
]
},
)
run_druid_sql_query >> run_druid_json_query