{"id":9105,"library":"metaflow-checkpoint","title":"Metaflow Checkpoint","description":"Metaflow-checkpoint is an experimental extension for Metaflow that provides in-task checkpointing capabilities. It allows users to periodically save the progress of long-running Metaflow steps, such as machine learning model training, ensuring recovery from failures without losing significant work. The library, currently at version 0.2.10, is released as an independent extension to core Metaflow.","status":"active","version":"0.2.10","language":"en","source_language":"en","source_url":"https://docs.metaflow.org/api/metaflow-extensions/checkpoint","tags":["metaflow","checkpointing","machine learning","fault tolerance","mlops","python"],"install":[{"cmd":"pip install metaflow-checkpoint","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"metaflow-checkpoint is an extension for the Metaflow framework.","package":"metaflow"}],"imports":[{"note":"The `checkpoint` decorator is designed to be imported directly from `metaflow` after `metaflow-checkpoint` is installed, not from a separate module path.","wrong":"from metaflow_checkpoint import checkpoint","symbol":"checkpoint","correct":"from metaflow import FlowSpec, step, current, checkpoint"},{"note":"`save()` is a method on the `current.checkpoint` object within a Metaflow step, not a standalone function or method on the `checkpoint` decorator itself.","wrong":"checkpoint.save()","symbol":"current.checkpoint.save","correct":"current.checkpoint.save()"}],"quickstart":{"code":"import os\nimport random\nfrom metaflow import FlowSpec, step, current, checkpoint, retry\n\nclass CheckpointCounterFlow(FlowSpec):\n    @retry(times=2, minutes_between_retries=1)\n    @checkpoint(load_policy='eager') # Use 'eager' for development across runs\n    @step\n    def start(self):\n        self.counter = 0\n        if current.checkpoint.is_loaded:\n            print(f\"Resuming from checkpoint. Counter was {self.counter}\")\n            with open(os.path.join(current.checkpoint.directory, 'counter'), 'r') as f:\n                self.counter = int(f.read())\n            print(f\"Successfully loaded counter: {self.counter}\")\n        else:\n            print(\"Starting from scratch.\")\n\n        for i in range(5):\n            self.counter += 1\n            print(f\"Processing iteration {i+1}, counter is {self.counter}\")\n            # Save progress periodically\n            with open(os.path.join(current.checkpoint.directory, 'counter'), 'w') as f:\n                f.write(str(self.counter))\n            current.checkpoint.save()\n\n            # Simulate a flaky operation\n            if random.random() < 0.3:\n                raise Exception(\"Simulated failure!\")\n\n        self.next(self.end)\n\n    @step\n    def end(self):\n        print(f\"Flow finished. Final counter value: {self.counter}\")\n\nif __name__ == '__main__':\n    CheckpointCounterFlow()","lang":"python","description":"This quickstart demonstrates a Metaflow flow using the `@checkpoint` decorator. The `start` step simulates a long-running, flaky process that increments a counter. It saves the counter value to `current.checkpoint.directory` and calls `current.checkpoint.save()` periodically. Upon restart (due to `@retry` or `resume` command), it loads the last saved counter using `current.checkpoint.is_loaded` and `current.checkpoint.directory`. The `load_policy='eager'` allows checkpoints to be reused across different runs, aiding iterative development. Run with `python your_flow.py run` and try `python your_flow.py resume start` after an interruption."},"warnings":[{"fix":"Always review the latest Metaflow documentation and `metaflow-checkpoint` release notes when upgrading to new versions. Be prepared for potential code adjustments.","message":"The `metaflow-checkpoint` library is explicitly labeled as EXPERIMENTAL. Its APIs may change in future versions, and it does not offer the same backwards compatibility guarantees as core Metaflow APIs.","severity":"breaking","affected_versions":"0.2.x and earlier"},{"fix":"For iterative development and resuming across different runs (e.g., stopping a flow and restarting later), use `@checkpoint(load_policy='eager')`. For custom loading logic, use `@checkpoint(load_policy=None)` and manually call `current.checkpoint.load()`.","message":"The default `load_policy='fresh'` for `@checkpoint` only loads task-specific checkpoints for retries within the same run. It explicitly disregards existing checkpoints when a *new* run is initiated.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure that your checkpointing logic overwrites existing files within `current.checkpoint.directory` or explicitly cleans up the directory between checkpoint saves to prevent excessive file accumulation. For example, always use the same filename for your latest model state.","message":"Files saved to `current.checkpoint.directory` can accumulate across invocations if not managed, potentially leading to performance degradation over time as `current.checkpoint.save()` processes more data.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"First, ensure the library is installed: `pip install metaflow-checkpoint`. Then, import it alongside other Metaflow components: `from metaflow import FlowSpec, step, current, checkpoint`.","cause":"The `metaflow-checkpoint` extension was not installed, or the `checkpoint` decorator was not imported correctly from `metaflow`.","error":"NameError: name 'checkpoint' is not defined"},{"fix":"If you intend to resume from a previous run, particularly during development, change the decorator to `@checkpoint(load_policy='eager')`. For more complex scenarios, consider `load_policy=None` and implement custom loading via `current.checkpoint.load()`.","cause":"The default `load_policy='fresh'` for the `@checkpoint` decorator is designed for recovery within the *same* run (e.g., after a `@retry`). It deliberately does not load checkpoints from previous runs.","error":"My flow didn't resume from the last saved state when I restarted it with `python my_flow.py run`."},{"fix":"The `checkpoint` symbol imported from `metaflow` is the decorator (`@checkpoint`). `current.checkpoint` is an object available *inside* a decorated step, providing methods like `current.checkpoint.save()` and properties like `current.checkpoint.directory`.","cause":"Attempting to call `current.checkpoint` as a function, e.g., `current.checkpoint()` or incorrectly assuming `current.checkpoint` itself is the decorator.","error":"TypeError: 'MetaflowCheckpoint' object is not callable"}]}