Databricks Provider for Apache Airflow
The `apache-airflow-providers-databricks` package provides operators, hooks, and sensors to interact with Databricks, a unified data and analytics platform. It allows users to orchestrate Databricks notebooks, jobs, and SQL queries from Apache Airflow. The provider is actively maintained by the Apache Airflow community and is currently on version 7.12.0, with regular updates to support new Databricks features and Airflow versions.
Warnings
- breaking Minimum Airflow version requirement for the provider package increases with new releases. For example, provider version 3.0.0 required Airflow 2.2.0+, and provider 7.8.0+ requires Airflow 2.11.0+. Installing a newer provider version on an older Airflow installation may lead to Airflow core being auto-upgraded by pip, causing dependency conflicts.
- deprecated The `astro-provider-databricks` (Astronomer's Databricks provider) has been deprecated. Its features were integrated into the official `apache-airflow-providers-databricks` package from version 6.8.0. Using the deprecated provider may lead to missing features, bugs, or security vulnerabilities.
- breaking The behavior of `DatabricksSqlHook.run()` changed in pre-4.x versions to 4.x. Previously, it returned a tuple of `("cursor description", "results")`. In 4.x and later, it now returns only `"results"` to conform with other `DBApiHook` implementations. Custom hooks or TaskFlow code relying on the old behavior will break.
- gotcha Authentication using Databricks username and password is discouraged and not supported for some operators like `DatabricksSqlOperator`. This method is less secure and flexible than using Personal Access Tokens (PATs) or OAuth with Service Principals.
- gotcha When configuring a Databricks connection in Airflow, ensure the 'Host' field contains the full Databricks workspace URL (e.g., `https://adb-xxxxxxxx.xx.databricks.com/`). Missing or incorrect host can lead to connection errors.
Install
-
pip install apache-airflow-providers-databricks
Imports
- DatabricksRunNowOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
- DatabricksSubmitRunOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
- DatabricksSqlOperator
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
- DatabricksNotebookOperator
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
- DatabricksHook
from airflow.providers.databricks.hooks.databricks import DatabricksHook
Quickstart
from __future__ import annotations
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
# Configure your Databricks connection in Airflow UI (Admin -> Connections)
# Conn Id: 'databricks_default'
# Conn Type: 'Databricks'
# Host: Your Databricks workspace URL (e.g., https://adb-xxxxxxxx.xx.databricks.com)
# Password: Your Databricks Personal Access Token (PAT) (recommended over username/password)
# See https://airflow.apache.org/docs/apache-airflow-providers-databricks/stable/connections/databricks.html
with DAG(
dag_id="databricks_run_notebook_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["databricks", "example"],
doc_md="""
### Databricks Run Notebook Example DAG
This DAG demonstrates how to use `DatabricksRunNowOperator` to execute
an existing Databricks job (which might run a notebook or JAR).
**Prerequisites:**
1. An Airflow connection named `databricks_default` configured with your Databricks workspace details and PAT.
2. An existing Databricks job. Replace `YOUR_DATABRICKS_JOB_ID` with your actual Job ID.
""",
) as dag:
run_databricks_job = DatabricksRunNowOperator(
task_id="run_existing_databricks_job",
databricks_conn_id="databricks_default",
job_id="YOUR_DATABRICKS_JOB_ID", # Replace with your Databricks Job ID
# You can pass optional parameters to the job run
# notebook_params={"input_param": "value_from_airflow"},
# or spark_jar_params, spark_python_task_params, etc. depending on job type
)