Luigi Workflow Management
Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, and much more. It's developed by Spotify and is currently in version 3.8.0, with minor releases typically occurring every few months.
Warnings
- breaking Luigi 3.x series requires Python >= 3.10 and < 3.14. Projects on older Python 3 versions (e.g., 3.6-3.9) or newer versions (3.14+) will not be compatible.
- gotcha Luigi tasks require a scheduler to run. By default, it looks for a running `luigid` daemon. For simple local execution or development, remember to use the `--local-scheduler` flag when running tasks from the command line, or `local_scheduler=True` when using `luigi.build()` programmatically.
- deprecated The `pkg_resources` library, which was historically used for introspection and entry points, has been removed in Luigi 3.7.3. While this is primarily an internal change, it might affect custom plugins or integrations that directly or indirectly relied on Luigi's usage of `pkg_resources`.
- gotcha Luigi determines task completion solely based on whether the `output()` targets `exist()`. If a task's `run()` method completes but fails to create its defined `output()`, Luigi will consider it incomplete and re-run it in subsequent invocations. Similarly, manually deleting an output file will cause Luigi to re-run the producing task.
Install
-
pip install luigi
Imports
- Task
import luigi class MyTask(luigi.Task): ...
- Parameter
import luigi class MyTask(luigi.Task): param = luigi.Parameter() - LocalTarget
import luigi luigi.LocalTarget('path/to/file') - build
import luigi luigi.build([my_task_instance], local_scheduler=True)
Quickstart
import luigi
import datetime
import os
class GenerateReport(luigi.Task):
date = luigi.DateParameter(default=datetime.date.today())
def run(self):
# Simulate some data processing
report_content = f"Daily Report for {self.date.isoformat()}\n" \
f"Generated by Luigi.\n"
# Write the report to a target file
with self.output().open('w') as f:
f.write(report_content)
def output(self):
# Define where the output of this task will be stored
# Using an environment variable for flexibility or defaulting to current dir
output_dir = os.environ.get('LUIGI_OUTPUT_DIR', '.')
return luigi.LocalTarget(os.path.join(output_dir, f'report_{self.date.isoformat()}.txt'))
if __name__ == '__main__':
# To run this task using the command line (most common):
# 1. Start the Luigi scheduler daemon in a separate terminal: luigid --port 8082
# 2. Run your script: python your_script_name.py GenerateReport --date 2023-10-26
# (or omit --date for today's date if default is set)
# 3. If you don't want to run luigid, use the --local-scheduler flag:
# python your_script_name.py GenerateReport --local-scheduler
# For programmatic execution (e.g., in a wrapper script or test):
# The 'local_scheduler=True' ensures it runs without an external luigid daemon.
luigi.build([GenerateReport(date=datetime.date(2023, 10, 26))], local_scheduler=True)