MetricFlow

0.209.0 · active · verified Thu Apr 16

MetricFlow is a Python library (version 0.209.0) developed by dbt Labs that serves as the engine for defining, querying, and serving metrics from dbt projects. It translates high-level metric definitions into reusable SQL and executes them against various data platforms. While it powers the dbt Semantic Layer, direct programmatic API usage is primarily for advanced scenarios like custom metric servers or programmatic SQL generation, rather than typical application-level querying. Releases typically align with dbt Core release cycles or critical updates for MetricFlow's standalone capabilities.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to programmatically use MetricFlow to parse a semantic model definition and generate the corresponding SQL query. It sets up a minimal in-memory dbt project, configures the MetricFlow engine with a `NullMetricFlowConnection` to only generate a query plan (SQL) without execution. For actual data retrieval, replace `NullMetricFlowConnection` with a real database connection (e.g., `PostgresMetricFlowConnection`) and use `engine.query(query_spec)`.

import os
import tempfile
from pathlib import Path
import shutil

from metricflow.engine.metricflow_engine import MetricFlowEngine
from metricflow.config.metricflow_config import MetricFlowConfig
from metricflow.connection.null_metricflow_connection import NullMetricFlowConnection
from metricflow.specs.query_specs import MetricFlowQuerySpec, MetricTimeDimension
from metricflow.specs.metric_request import MetricRequest
from metricflow.specs.dimension_spec import DimensionSpec

# This quickstart demonstrates programmatic SQL generation for a minimal semantic model.
# Actual execution requires a configured database connection.

# 1. Create a dummy dbt project directory with a semantic model YAML file
temp_dir = Path(tempfile.mkdtemp())
dbt_project_dir = temp_dir / "my_dbt_project"
dbt_project_dir.mkdir()
(dbt_project_dir / "dbt_project.yml").write_text("""
name: 'my_dbt_project'
version: '1.0.0'
config-version: 2
profile: 'default'
""")

models_dir = dbt_project_dir / "models"
models_dir.mkdir()
(models_dir / "my_semantic_model.yml").write_text("""
semantic_models:
  - name: my_transactions_semantic_model
    description: "Transactions data"
    entities:
      - name: user_id
        type: primary
    measures:
      - name: sum_amount
        agg: sum
        expr: amount
    dimensions:
      - name: transaction_date
        type: time
        expr: created_at
        time_granularity: day
    metrics:
      - name: total_revenue
        calculation_config:
          measure: sum_amount
""")

# 2. Configure MetricFlow by pointing to the dbt project
mf_config = MetricFlowConfig(
    dbt_project_path=str(dbt_project_dir)
)

# 3. Use NullMetricFlowConnection for generating SQL plans without executing against a real DB
mf_connection = NullMetricFlowConnection()

# 4. Initialize MetricFlowEngine
engine = MetricFlowEngine(mf_config, mf_connection)

# 5. Define a query for the 'total_revenue' metric, grouped by user_id and day
metric_requests = [
    MetricRequest(metric_name="total_revenue")
]
time_dimension = MetricTimeDimension(
    grain="day",
    date_spec=('2023-01-01', '2023-01-31')
)
query_spec = MetricFlowQuerySpec(
    metric_requests=metric_requests,
    group_by_dimensions=[DimensionSpec(element_name="user_id")],
    time_dimension=time_dimension
)

print("\n--- Generating SQL Query Plan ---")
try:
    # plan_query returns the SQL query string for the given spec
    query_plan_result = engine.plan_query(query_spec)
    print("Generated SQL Query:")
    print(query_plan_result.sql_query)

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    # Clean up the temporary dbt project directory
    shutil.rmtree(temp_dir)

view raw JSON →