Databricks Delta Live Tables (DLT) Python Stubs

0.3.0 · active · verified Sat Apr 11

The `databricks-dlt` library provides Python stubs to facilitate local development of Databricks Delta Live Tables (DLT) pipelines. It offers API specifications, docstring references for IDE autocompletion, and Python data type hints for static type checking. This library is purely for development-time assistance and *does not contain functional implementations*; DLT pipelines must be executed on a Databricks workspace. The underlying DLT product has recently been updated to Lakeflow Spark Declarative Pipelines (SDP) by Databricks, with continuous updates on the platform.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a typical Delta Live Tables pipeline structure using Python decorators within a Databricks notebook. It outlines a medallion architecture (bronze, silver, gold) for data processing. Note that while the `databricks-dlt` library provides IDE support for such code, actual execution and data processing occur only when deployed and run as a DLT pipeline on a Databricks workspace.

# This code snippet is designed to run within a Databricks DLT Notebook environment.
# The 'databricks-dlt' library provides stubs for local development, 
# but actual execution requires a Databricks workspace.

import dlt
from pyspark.sql.functions import *

# Define a streaming table (Bronze layer)
@dlt.table
def raw_data():
    # In a real scenario, this would read from a source like Auto Loader
    # e.g., spark.readStream.format('cloudFiles').option('cloudFiles.format', 'json').load('/databricks-datasets/retail-org/sales_orders/')
    # For demonstration, we'll simulate a static DataFrame read as this is a stub example
    return spark.read.format('json').load('/databricks-datasets/retail-org/sales_orders/')

# Define a cleansed table (Silver layer) with expectations
@dlt.table(comment='Cleansed sales orders with valid order numbers')
@dlt.expect_or_drop('valid_order_number', 'order_number IS NOT NULL')
def cleansed_data():
    return dlt.read('raw_data').select(col('customer_id'), col('order_number'), col('order_date'))

# Define an aggregated table (Gold layer)
@dlt.table(name='daily_sales_summary')
def daily_sales():
    return (
        dlt.read('cleansed_data')
        .groupBy('order_date')
        .agg(count('order_number').alias('total_orders'))
    )

view raw JSON →