Databricks SQLAlchemy plugin for Python
Databricks SQLAlchemy is a Python library that provides a SQLAlchemy dialect for connecting to Databricks SQL warehouses and clusters. It serves as a bridge between SQLAlchemy's ORM capabilities and the Databricks SQL Python driver, enabling Python applications to interact with Databricks data using standard SQLAlchemy patterns. The library is currently at version 2.0.9 and regularly receives updates, including recent support for complex data types like VARIANT, ARRAY, and MAP.
Warnings
- breaking The `databricks-sqlalchemy` library has distinct major versions for SQLAlchemy 1.x and SQLAlchemy 2.x. Version `1.x` of `databricks-sqlalchemy` supports SQLAlchemy 1.x, while `2.x` supports SQLAlchemy 2.x. Upgrading `databricks-sqlalchemy` to `2.x` requires a corresponding upgrade of `SQLAlchemy` to `2.x`, and vice-versa, to ensure compatibility.
- gotcha There are two similarly named, but distinct, Python packages: `databricks-sqlalchemy` (the official dialect, this package) and `sqlalchemy-databricks` (an older, separate community-driven package). Always ensure you install and use `databricks-sqlalchemy` for official and up-to-date support.
- gotcha The SQLAlchemy 2.0 dialect for Databricks *always* uses native parameterized queries with the 'named' paramstyle (`:param`) and requires Databricks Runtime 14.2 or above. Using a different paramstyle or older DBR versions will result in query failures.
- gotcha SQLAlchemy's `DateTime()` type is mapped to Databricks' timezone-agnostic `TIMESTAMP_NTZ()`. If you declare `DateTime(timezone=True)`, the `timezone` argument will be ignored by the Databricks dialect. For timezone-aware timestamps, you must explicitly import and use `databricks.sqlalchemy.TIMESTAMP()`.
- gotcha Older versions of `databricks-sqlalchemy` (e.g., 1.0.5) had issues with unquoted identifiers in Unity Catalog, particularly for catalog or schema names containing hyphens. This could lead to `[INVALID_IDENTIFIER]` errors during metadata operations.
- gotcha The `autoincrement` argument on SQLAlchemy type declarations (e.g., `Integer(autoincrement=True)`) is currently ignored by the Databricks dialect. To create an auto-incrementing field, you must explicitly use `sqlalchemy.Identity()` and combine it with the `databricks.sqlalchemy.BigInteger()` type, as only BIGINT fields support auto-increment in Databricks Runtime.
Install
-
pip install databricks-sqlalchemy
Imports
- create_engine
from sqlalchemy import create_engine
- DatabricksDialect (implicit)
engine = create_engine('databricks://...') - TIMESTAMP
from databricks.sqlalchemy import TIMESTAMP
- TIMESTAMP_NTZ
from databricks.sqlalchemy import TIMESTAMP_NTZ
- TINYINT
from databricks.sqlalchemy import TINYINT
Quickstart
import os
from sqlalchemy import create_engine, text
# Ensure these environment variables are set:
# DATABRICKS_SERVER_HOSTNAME: Hostname of your Databricks workspace or SQL warehouse
# DATABRICKS_HTTP_PATH: HTTP path of your SQL warehouse or cluster
# DATABRICKS_TOKEN: Your Databricks Personal Access Token
# DATABRICKS_CATALOG (optional): Initial Unity Catalog catalog
# DATABRICKS_SCHEMA (optional): Initial Unity Catalog schema
host = os.environ.get('DATABRICKS_SERVER_HOSTNAME', 'your_databricks_host.cloud.databricks.com')
http_path = os.environ.get('DATABRICKS_HTTP_PATH', '/sql/1.0/endpoints/your_http_path')
access_token = os.environ.get('DATABRICKS_TOKEN', 'dapi...')
catalog = os.environ.get('DATABRICKS_CATALOG', 'default')
schema = os.environ.get('DATABRICKS_SCHEMA', 'default')
try:
connection_string = (
f"databricks://token:{access_token}@{host}?"
f"http_path={http_path}&catalog={catalog}&schema={schema}"
)
engine = create_engine(connection_string)
with engine.connect() as connection:
# Example: Execute a simple query
result = connection.execute(text("SELECT 1 AS one, 'hello' AS two"))
for row in result:
print(f"Row: {row.one}, {row.two}")
# Example with parameterized query (requires DBR 14.2+)
# Note: Databricks dialect uses 'named' paramstyle ':param'
param_value = "world"
result = connection.execute(text("SELECT :val AS greeting"), {'val': param_value})
for row in result:
print(f"Greeting: {row.greeting}")
print("Successfully connected and executed queries.")
except Exception as e:
print(f"Error connecting to Databricks: {e}")
print("Please ensure environment variables are correctly set and Databricks resources are accessible.")