DLT-META Framework

0.0.10 · active · verified Thu Apr 16

DLT-META is a metadata-driven framework for Databricks Lakeflow Declarative Pipelines, designed to automate the creation and management of bronze and silver data pipelines. It leverages metadata defined in JSON or YAML files to dynamically generate pipeline code, streamlining data engineering workflows. The library is currently at version 0.0.10 and has active, though irregular, release cycles with consistent updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to programmatically invoke the `dlt-meta` framework within a Databricks environment (typically a notebook or job). It assumes `dlt-meta` is installed and metadata has been onboarded. The `DataflowPipeline.invoke_dlt_pipeline` method orchestrates the creation and execution of DLT pipelines based on the provided layer and environment, reading from pre-configured metadata.

# This code typically runs within a Databricks Notebook or job after metadata onboarding.
# Ensure 'dlt-meta' is installed via %pip install dlt-meta in the notebook or as a cluster library.

import dlt
from dlt_meta import DataflowPipeline
import os

# These parameters would typically be passed as job parameters in Databricks
# For local testing, you might set environment variables or hardcode.
layer = os.environ.get('DLT_META_LAYER', 'bronze').lower() # e.g., 'bronze' or 'silver'
env = os.environ.get('DLT_META_ENV', 'dev').lower() # e.g., 'dev', 'qa', 'prod'

# In a Databricks environment, 'spark' session is implicitly available.
# For local testing outside Databricks, you would need to initialize a SparkSession.
# Example placeholder for local SparkSession (not typically done in DLT-META's primary use-case):
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("dlt-meta-local").getOrCreate()

try:
    print(f"Attempting to invoke DLT-META for layer: {layer} (env: {env}).")
    # The 'spark' object is expected to be the Databricks SparkSession
    DataflowPipeline.invoke_dlt_pipeline(spark=spark, layer=layer, env=env)
    print(f"DLT-META successfully invoked for layer: {layer} (env: {env}).")
except ImportError:
    print("ERROR: Could not import DataflowPipeline from dlt_meta. Ensure the 'dlt-meta' library is installed and available.")
    raise
except Exception as e:
    print(f"ERROR: An exception occurred during DLT-META pipeline invocation for layer '{layer}' in env '{env}': {e}")
    raise

view raw JSON →