{"id":9936,"library":"metaflow-torchrun","title":"Metaflow Torchrun Integration","description":"Metaflow-torchrun is a Python library that provides a `@torchrun` decorator to enable distributed PyTorch training within Metaflow steps. It abstracts away the complexities of launching and managing `torchrun` processes, allowing users to integrate distributed ML workflows seamlessly into their Metaflow flows. The current version is 0.2.1, with relatively frequent updates since its initial release.","status":"active","version":"0.2.1","language":"en","source_language":"en","source_url":"https://github.com/outerbounds/metaflow-torchrun","tags":["metaflow","pytorch","distributed-training","mlops","decorator"],"install":[{"cmd":"pip install metaflow-torchrun","lang":"bash","label":"Install metaflow-torchrun"},{"cmd":"pip install metaflow torch","lang":"bash","label":"Install core dependencies"}],"dependencies":[{"reason":"Core MLOps framework that this library extends.","package":"metaflow","optional":false},{"reason":"PyTorch library for distributed training, which includes `torchrun`.","package":"torch","optional":false}],"imports":[{"symbol":"torchrun","correct":"from metaflow_torchrun import torchrun"}],"quickstart":{"code":"import os\nimport logging\nfrom metaflow import FlowSpec, step, current\nfrom metaflow_torchrun import torchrun\n\n# Set up basic logging to see output from torchrun\nlogging.basicConfig(level=logging.INFO)\n\nclass MyDistributedFlow(FlowSpec):\n\n    @step\n    def start(self):\n        print(f\"Starting flow {current.flow_name}...\")\n        self.next(self.train_distributed)\n\n    # Decorate a step with @torchrun to enable distributed execution\n    @torchrun(nproc_per_node=2) # Use 2 processes for illustration on a single node\n    @step\n    def train_distributed(self):\n        # Inside a torchrun decorated step, the script is executed by multiple processes.\n        # Each process will have environment variables like LOCAL_RANK, RANK, WORLD_SIZE.\n        local_rank = int(os.environ.get(\"LOCAL_RANK\", -1))\n        global_rank = int(os.environ.get(\"RANK\", -1)) # RANK is the global rank\n        world_size = int(os.environ.get(\"WORLD_SIZE\", -1))\n\n        print(f\"Hello from process {global_rank}/{world_size} (local rank {local_rank})\\n\" +\n              f\" in Metaflow run {current.run_id} step {current.step_name}.\")\n\n        # In a real scenario, you'd initialize a process group and run DDP here.\n        # import torch.distributed as dist\n        # if local_rank != -1:\n        #     dist.init_process_group(\"nccl\" if torch.cuda.is_available() else \"gloo\", rank=global_rank, world_size=world_size)\n        # Your PyTorch model training code...\n        # if local_rank != -1:\n        #     dist.destroy_process_group()\n\n        self.next(self.end)\n\n    @step\n    def end(self):\n        print(\"Distributed training flow completed!\")\n\nif __name__ == \"__main__\":\n    # To run this flow:\n    # 1. Save it as a Python file, e.g., `my_flow.py`\n    # 2. Run from your terminal: `python my_flow.py run`\n    # Ensure `metaflow-torchrun`, `metaflow`, and `torch` are installed.\n    MyDistributedFlow()","lang":"python","description":"This example demonstrates a basic Metaflow flow using the `@torchrun` decorator. Save this code as a Python file (e.g., `my_flow.py`) and run it from your terminal using `python my_flow.py run`. It will launch 2 parallel processes on the local machine within the `train_distributed` step, each printing its rank information. Ensure `metaflow-torchrun`, `metaflow`, and `torch` are installed in your environment."},"warnings":[{"fix":"Run Metaflow flows decorated with `@torchrun` as a standalone Python script from your terminal: `python your_flow.py run`.","message":"The `@torchrun` decorator is incompatible with interactive notebooks (e.g., Jupyter), requiring Metaflow flows to be run from a standalone Python file.","severity":"gotcha","affected_versions":">=0.1.0"},{"fix":"Install `torch` (which includes `torchrun`) in your environment: `pip install torch` (or `pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118` for CUDA-enabled versions).","message":"Ensure `torch` is installed and `torchrun` is available in the Python environment where the Metaflow step executes.","severity":"gotcha","affected_versions":">=0.1.0"},{"fix":"Pass arguments as `args=['--epochs', '10', '--lr', '0.01']` instead of a single string, dictionary, or other types. For example: `@torchrun(args=['--model-name', 'resnet50'])`.","message":"Arguments intended for your PyTorch script, when passed to the `args` parameter of the `@torchrun` decorator, must be a list of strings.","severity":"gotcha","affected_versions":">=0.1.0"},{"fix":"Understand that `metaflow-torchrun` simplifies single-node distributed training. For multi-node setups, more advanced Metaflow features or other orchestrators would be required.","message":"The current implementation of `@torchrun` distributes processes across multiple GPUs/CPUs *on a single node*, not across multiple machines.","severity":"gotcha","affected_versions":">=0.1.0"}],"env_vars":null,"last_verified":"2026-04-17T00:00:00.000Z","next_check":"2026-07-16T00:00:00.000Z","problems":[{"fix":"Install the PyTorch library: `pip install torch` (or the appropriate command for your system/CUDA version, e.g., `pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118`).","cause":"`torch` (which bundles `torchrun`) is not installed or not accessible in the Python environment where the Metaflow step runs.","error":"ModuleNotFoundError: No module named 'torchrun'"},{"fix":"Ensure only valid decorator arguments like `nproc_per_node`, `args`, `module`, etc., are used. For arguments to your PyTorch script, use the `args` parameter: `@torchrun(args=['--my-script-arg', 'value'])`.","cause":"An unknown or misspelled argument was passed directly to the `@torchrun` decorator, or arguments intended for the PyTorch script were passed incorrectly.","error":"TypeError: torchrun() got an unexpected keyword argument 'my_arg'"},{"fix":"Save your Metaflow flow as a Python file (e.g., `my_flow.py`) and execute it from the command line: `python my_flow.py run`.","cause":"Attempting to run a Metaflow flow with the `@torchrun` decorator from an interactive environment (e.g., Jupyter notebook, IPython console), which is not supported.","error":"MetaflowException: The @torchrun decorator is only supported when running a flow from a Python file, not interactively."}]}