Metaflow Checkpoint
Metaflow-checkpoint is an experimental extension for Metaflow that provides in-task checkpointing capabilities. It allows users to periodically save the progress of long-running Metaflow steps, such as machine learning model training, ensuring recovery from failures without losing significant work. The library, currently at version 0.2.10, is released as an independent extension to core Metaflow.
Common errors
-
NameError: name 'checkpoint' is not defined
cause The `metaflow-checkpoint` extension was not installed, or the `checkpoint` decorator was not imported correctly from `metaflow`.fixFirst, ensure the library is installed: `pip install metaflow-checkpoint`. Then, import it alongside other Metaflow components: `from metaflow import FlowSpec, step, current, checkpoint`. -
My flow didn't resume from the last saved state when I restarted it with `python my_flow.py run`.
cause The default `load_policy='fresh'` for the `@checkpoint` decorator is designed for recovery within the *same* run (e.g., after a `@retry`). It deliberately does not load checkpoints from previous runs.fixIf you intend to resume from a previous run, particularly during development, change the decorator to `@checkpoint(load_policy='eager')`. For more complex scenarios, consider `load_policy=None` and implement custom loading via `current.checkpoint.load()`. -
TypeError: 'MetaflowCheckpoint' object is not callable
cause Attempting to call `current.checkpoint` as a function, e.g., `current.checkpoint()` or incorrectly assuming `current.checkpoint` itself is the decorator.fixThe `checkpoint` symbol imported from `metaflow` is the decorator (`@checkpoint`). `current.checkpoint` is an object available *inside* a decorated step, providing methods like `current.checkpoint.save()` and properties like `current.checkpoint.directory`.
Warnings
- breaking The `metaflow-checkpoint` library is explicitly labeled as EXPERIMENTAL. Its APIs may change in future versions, and it does not offer the same backwards compatibility guarantees as core Metaflow APIs.
- gotcha The default `load_policy='fresh'` for `@checkpoint` only loads task-specific checkpoints for retries within the same run. It explicitly disregards existing checkpoints when a *new* run is initiated.
- gotcha Files saved to `current.checkpoint.directory` can accumulate across invocations if not managed, potentially leading to performance degradation over time as `current.checkpoint.save()` processes more data.
Install
-
pip install metaflow-checkpoint
Imports
- checkpoint
from metaflow_checkpoint import checkpoint
from metaflow import FlowSpec, step, current, checkpoint
- current.checkpoint.save
checkpoint.save()
current.checkpoint.save()
Quickstart
import os
import random
from metaflow import FlowSpec, step, current, checkpoint, retry
class CheckpointCounterFlow(FlowSpec):
@retry(times=2, minutes_between_retries=1)
@checkpoint(load_policy='eager') # Use 'eager' for development across runs
@step
def start(self):
self.counter = 0
if current.checkpoint.is_loaded:
print(f"Resuming from checkpoint. Counter was {self.counter}")
with open(os.path.join(current.checkpoint.directory, 'counter'), 'r') as f:
self.counter = int(f.read())
print(f"Successfully loaded counter: {self.counter}")
else:
print("Starting from scratch.")
for i in range(5):
self.counter += 1
print(f"Processing iteration {i+1}, counter is {self.counter}")
# Save progress periodically
with open(os.path.join(current.checkpoint.directory, 'counter'), 'w') as f:
f.write(str(self.counter))
current.checkpoint.save()
# Simulate a flaky operation
if random.random() < 0.3:
raise Exception("Simulated failure!")
self.next(self.end)
@step
def end(self):
print(f"Flow finished. Final counter value: {self.counter}")
if __name__ == '__main__':
CheckpointCounterFlow()