{"library":"metaflow-torchrun","title":"Metaflow Torchrun Integration","description":"Metaflow-torchrun is a Python library that provides a `@torchrun` decorator to enable distributed PyTorch training within Metaflow steps. It abstracts away the complexities of launching and managing `torchrun` processes, allowing users to integrate distributed ML workflows seamlessly into their Metaflow flows. The current version is 0.2.1, with relatively frequent updates since its initial release.","language":"python","status":"active","last_verified":"Fri Apr 17","install":{"commands":["pip install metaflow-torchrun","pip install metaflow torch"],"cli":null},"imports":["from metaflow_torchrun import torchrun"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"import os\nimport logging\nfrom metaflow import FlowSpec, step, current\nfrom metaflow_torchrun import torchrun\n\n# Set up basic logging to see output from torchrun\nlogging.basicConfig(level=logging.INFO)\n\nclass MyDistributedFlow(FlowSpec):\n\n    @step\n    def start(self):\n        print(f\"Starting flow {current.flow_name}...\")\n        self.next(self.train_distributed)\n\n    # Decorate a step with @torchrun to enable distributed execution\n    @torchrun(nproc_per_node=2) # Use 2 processes for illustration on a single node\n    @step\n    def train_distributed(self):\n        # Inside a torchrun decorated step, the script is executed by multiple processes.\n        # Each process will have environment variables like LOCAL_RANK, RANK, WORLD_SIZE.\n        local_rank = int(os.environ.get(\"LOCAL_RANK\", -1))\n        global_rank = int(os.environ.get(\"RANK\", -1)) # RANK is the global rank\n        world_size = int(os.environ.get(\"WORLD_SIZE\", -1))\n\n        print(f\"Hello from process {global_rank}/{world_size} (local rank {local_rank})\\n\" +\n              f\" in Metaflow run {current.run_id} step {current.step_name}.\")\n\n        # In a real scenario, you'd initialize a process group and run DDP here.\n        # import torch.distributed as dist\n        # if local_rank != -1:\n        #     dist.init_process_group(\"nccl\" if torch.cuda.is_available() else \"gloo\", rank=global_rank, world_size=world_size)\n        # Your PyTorch model training code...\n        # if local_rank != -1:\n        #     dist.destroy_process_group()\n\n        self.next(self.end)\n\n    @step\n    def end(self):\n        print(\"Distributed training flow completed!\")\n\nif __name__ == \"__main__\":\n    # To run this flow:\n    # 1. Save it as a Python file, e.g., `my_flow.py`\n    # 2. Run from your terminal: `python my_flow.py run`\n    # Ensure `metaflow-torchrun`, `metaflow`, and `torch` are installed.\n    MyDistributedFlow()","lang":"python","description":"This example demonstrates a basic Metaflow flow using the `@torchrun` decorator. Save this code as a Python file (e.g., `my_flow.py`) and run it from your terminal using `python my_flow.py run`. It will launch 2 parallel processes on the local machine within the `train_distributed` step, each printing its rank information. Ensure `metaflow-torchrun`, `metaflow`, and `torch` are installed in your environment.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}