dagster-mlflow

raw JSON →
0.29.3 verified Fri May 01 auth: no python

Dagster integration with MLflow, enabling tracking of ML experiments, models, and parameters within Dagster pipelines. Current version 0.29.3, supports Python >=3.10,<3.15. Releases follow Dagster core release cadence (approximately bi-weekly).

pip install dagster-mlflow
error ModuleNotFoundError: No module named 'dagster_mlflow'
cause dagster-mlflow not installed or wrong Python environment.
fix
Run pip install dagster-mlflow in the correct environment.
error AttributeError: 'mlflow_tracking' object has no attribute 'log_param'
cause Using the deprecated `mlflow_tracking` resource incorrectly or mixing old and new APIs.
fix
Define your own MLflow resource as shown in the quickstart, or use from dagster_mlflow.resources import mlflow_resource.
gotcha The `mlflow_tracking` resource is deprecated in favor of manually creating an MLflow resource using `dagster_mlflow.resources.mlflow_resource`. Do not use `mlflow_tracking` in new code.
fix Use `from dagster_mlflow.resources import mlflow_resource` and configure as a resource.
breaking In dagster-mlflow 0.28.0+, the `mlflow_run` context manager changed signature. Old usage `with mlflow_run(context) as run:` no longer works; use `EndTimeLoggedRun`.
fix Replace with `from dagster_mlflow import EndTimeLoggedRun` and use `with EndTimeLoggedRun(context, mlflow_run=context.resources.mlflow):`
deprecated The `MLflowRunContext` type alias may be removed in future versions. Consider using `context.resources.mlflow` directly.
fix Directly access `context.resources.mlflow` instead of relying on `MLflowRunContext`.

Minimal working example: defines an op logging params/metrics, a resource wrapping MLflow tracking, and a job to run it.

from dagster import job, op, resource
from dagster_mlflow import mlflow_tracking

@op(required_resource_keys={'mlflow'})
def train_model(context):
    mlflow = context.resources.mlflow
    mlflow.log_param('epochs', 10)
    mlflow.log_metric('accuracy', 0.95)

@resource(config_schema={'experiment_name': str})
def mlflow_resource(init_context):
    import mlflow
    mlflow.set_experiment(init_context.resource_config['experiment_name'])
    mlflow.start_run()
    yield mlflow
    mlflow.end_run()

@job(resource_defs={'mlflow': mlflow_resource})
def my_ml_job():
    train_model()

if __name__ == '__main__':
    my_ml_job.execute_in_process(
        run_config={
            'resources': {
                'mlflow': {
                    'config': {'experiment_name': 'demo'}
                }
            }
        }
    )