Azure Machine Learning Pipeline Core

1.62.0 · active · verified Wed Apr 15

This package contains core functionality for Azure Machine Learning pipelines, enabling the definition and execution of configurable machine learning workflows. As of version 1.62.0, it is a key component of the Azure ML SDK v1, providing building blocks for complex ML workflows. It follows a regular release cadence alongside other v1 SDK components.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a basic Azure ML Pipeline using `azureml-pipeline-core`. It sets up a mocked workspace for local execution, defines a pipeline parameter, an output, and a `PythonScriptStep`. The pipeline includes a simple script that performs a calculation and saves an output file. Note that `azureml-core` is typically required for full functionality and interaction with an actual Azure ML Workspace.

import os
from azureml.core import Workspace, Experiment, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep

# NOTE: For an actual Azure ML run, ensure you have 'azureml-core' installed
# and configured your workspace (e.g., via 'az login' and 'ws.write_config()').
# This example mocks the Workspace for local execution without live Azure setup.

# --- Mock Workspace for local execution (replace with actual Workspace.from_config() for Azure) ---
try:
    # Attempt to load actual workspace if configured
    ws = Workspace.from_config()
    print(f"Loaded Workspace: {ws.name}")
except Exception:
    print("Could not load workspace from config. Using dummy for example execution.")
    class MockDatastore:
        def __init__(self):
            self.name = "workspaceblobstore"
        def path(self, path_on_datastore): # Mimics the path() method
            return f"azureml://datastores/workspaceblobstore/paths/{path_on_datastore}"
    class MockWorkspace:
        def __init__(self):
            self.name = "mock_ws"
            self.resource_group = "mock_rg"
            self.subscription_id = "mock_sub_id"
        def get_default_datastore(self):
            return MockDatastore()
        def compute_targets(self):
            # Placeholder for compute target; 'local' is used if no cluster
            return {"cpu-cluster": None}
    ws = MockWorkspace()
# -------------------------------------------------------------------------------------------------

# Define an environment for the pipeline step (for Azure run, use a real curated/custom environment)
myenv = Environment("my-python-env")
myenv.python.user_managed_dependencies = False
myenv.docker.enabled = True
myenv.docker.base_image = "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:20210707.v1"

# Create a run configuration for the step
run_config = RunConfiguration()
run_config.environment = myenv

# Define a pipeline parameter
pipeline_param = PipelineParameter(name="input_multiplier", default_value=5)

# Define an output for the step, using PipelineData for intermediate data
output_data = PipelineData(name="multiplied_output", datastore=ws.get_default_datastore())

# Create a dummy Python script for the pipeline step
script_content = """
import argparse
import os
from azureml.core import Run

parser = argparse.ArgumentParser()
parser.add_argument("--input_multiplier", type=int)
parser.add_argument("--output_path", type=str)
args = parser.parse_args()

print(f"Received input_multiplier: {args.input_multiplier}")

# Get the run context
run = Run.get_context() # This works even with a mocked workspace, but won't interact with Azure.

# Create a dummy output directory and file
os.makedirs(args.output_path, exist_ok=True)
result = args.input_multiplier * 10
output_file_path = os.path.join(args.output_path, "result.txt")
with open(output_file_path, "w") as f:
    f.write(f"Calculation result: {result}")

print(f"Outputting data to: {output_file_path}")
run.upload_file(name="outputs/result.txt", path_or_stream=output_file_path)
"""
script_file = "my_pipeline_script.py"
with open(script_file, "w") as f:
    f.write(script_content)

# Create a PythonScriptStep
step1 = PythonScriptStep(
    name="multiply_step",
    script_name=script_file,
    arguments=[
        "--input_multiplier", pipeline_param,
        "--output_path", output_data
    ],
    outputs=[output_data],
    compute_target=ws.compute_targets().get("cpu-cluster", "local"), # Use 'local' for local execution
    runconfig=run_config,
    source_directory="."
)

# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[step1])

print(f"Pipeline '{pipeline.name}' created successfully.")
print("To run this pipeline on Azure, ensure your workspace is configured and uncomment the submission code below.")

# # Example of how to submit the pipeline to Azure (requires actual Workspace & Experiment):
# # experiment = Experiment(ws, "my_pipeline_experiment")
# # pipeline_run = experiment.submit(pipeline, pipeline_parameters={"input_multiplier": 7})
# # pipeline_run.wait_for_completion(show_output=True)

# Clean up the dummy script file
os.remove(script_file)

view raw JSON →