Dagster Databricks Integration
The `dagster-databricks` library provides Dagster ops and resources for interacting with Databricks. This includes resources for connecting to Databricks workspaces and ops for running Databricks jobs (notebooks, JARs, Python scripts) or PySpark jobs. As a Dagster library, its version (currently 0.29.0) is released in lockstep with the core `dagster` library (1.13.0), typically on a weekly or bi-weekly cadence.
Warnings
- breaking The `dagster-databricks` library versions are tied to the core `dagster` library versions. For example, `dagster-databricks==0.29.0` is compatible with `dagster==1.13.0`. Upgrading one without the other can lead to import errors or runtime issues due to API changes.
- gotcha Databricks authentication requires correctly configuring `DATABRICKS_HOST` and `DATABRICKS_TOKEN` (or equivalent secrets). Incorrect host URLs (e.g., missing `.cloud.databricks.com` suffix) or invalid personal access tokens are common setup errors.
- deprecated The `databricks_pyspark_step_launcher` was deprecated and replaced by the more flexible `databricks_pyspark_resource`. If you are upgrading from very old `dagster-databricks` versions, your PySpark job definitions will need to be updated.
- gotcha Parameters passed to `databricks_op` (e.g., `notebook_task`, `new_cluster`, `spark_jar_task`, `python_wheel_task`) must conform to the Databricks Jobs API specification. Small discrepancies in key names or structure can lead to job submission failures.
Install
-
pip install dagster-databricks
Imports
- databricks_resource
from dagster_databricks import databricks_resource
- databricks_pyspark_resource
from dagster_databricks import databricks_pyspark_resource
- databricks_op
from dagster_databricks import databricks_op
- DatabricksClient
from dagster_databricks import DatabricksClient
Quickstart
import os
from dagster import Definitions, job
from dagster_databricks import databricks_resource, databricks_op
# Configure Databricks resource using environment variables
# DATABRICKS_HOST should be 'https://<workspace-url>.cloud.databricks.com'
# DATABRICKS_TOKEN is your personal access token
databricks_config = {
"host": os.environ.get("DATABRICKS_HOST", ""),
"token": os.environ.get("DATABRICKS_TOKEN", "")
}
# Create a configured Databricks resource
configured_databricks_resource = databricks_resource.configured(databricks_config)
@databricks_op(
name="run_example_databricks_notebook",
# These parameters map directly to the Databricks Jobs API 'notebook_task'
# Replace with your actual notebook path and cluster configuration
notebook_task={
"notebook_path": "/Users/your_user@example.com/my_dagster_notebook"
},
new_cluster={
"spark_version": "12.2.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 1
}
)
def run_databricks_notebook(context):
"""An op to run a Databricks notebook job."""
context.log.info("Databricks notebook job submitted via Dagster.")
@job
def databricks_example_job():
run_databricks_notebook()
# Define a repository using Definitions
defs = Definitions(
jobs=[databricks_example_job],
resources={
"databricks_resource": configured_databricks_resource
}
)
# To run this example:
# 1. Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables.
# 2. Ensure '/Users/your_user@example.com/my_dagster_notebook' exists in your Databricks workspace.
# 3. Save this code as a Python file (e.g., `databricks_repo.py`).
# 4. Run `dagster dev -f databricks_repo.py` and launch the job from Dagit.