b2luigi - Belle II Luigi Extensions
b2luigi extends the Luigi workflow management system, primarily integrating it with the Belle II software framework (basf2) for batch processing tasks. It provides specialized tasks, runners, and target classes for managing data within the Belle II environment and various remote file systems. The library is actively maintained with frequent releases, typically every few months, addressing bug fixes and adding new functionalities like WebDAV support.
Warnings
- gotcha Prior to version 1.2.6, b2luigi had a strict `Python < 3.12` requirement. Attempting to use it with Python 3.12 or newer on older b2luigi versions would lead to installation or runtime errors.
- breaking Version 1.2.0 introduced new `b2luigi.LocalTarget` and `b2luigi.FileSystemTarget` classes, and v1.2.8 added `WebDAVTarget`. While designed to be compatible, code directly manipulating `luigi.LocalTarget` instances, custom target implementations, or relying on prior internal target structures might require adjustments.
- gotcha The `runner.remove_outputs` method had an incorrect keyword argument in versions prior to 1.2.7, leading to unexpected behavior or errors when attempting to remove task outputs programmatically.
- gotcha While `b2luigi` builds on `luigi`, its primary purpose is integration with the Belle II `basf2` framework. Many advanced features (e.g., specialized runners for batch systems) are tailored for this environment. Users outside the Belle II collaboration might find some functionalities less relevant or require custom setup.
Install
-
pip install b2luigi
Imports
- Task
from b2luigi import Task
- LocalTarget
from b2luigi import LocalTarget
- Basf2Task
from b2luigi.basf2 import Basf2Task
- run
from b2luigi import run
Quickstart
import b2luigi
import luigi
import os
class GenerateData(b2luigi.Task):
filename = luigi.Parameter()
def output(self):
return b2luigi.LocalTarget(f'data/{self.filename}.txt')
def run(self):
os.makedirs(os.path.dirname(self.output().path), exist_ok=True)
with self.output().open('w') as f:
f.write(f'Generated data for {self.filename}')
class ProcessData(b2luigi.Task):
filename = luigi.Parameter()
def requires(self):
return GenerateData(filename=self.filename)
def output(self):
return b2luigi.LocalTarget(f'processed_data/{self.filename}_processed.txt')
def run(self):
os.makedirs(os.path.dirname(self.output().path), exist_ok=True)
with self.input().open('r') as infile,
self.output().open('w') as outfile:
content = infile.read()
outfile.write(f'Processed: {content.upper()}')
if __name__ == '__main__':
# Use luigi.build for programmatic execution within a script.
# For command-line execution and parsing, b2luigi.run() is typically used.
luigi.build([
ProcessData(filename='example_file')
], local_scheduler=True)
print("\n--- Task completed ---")
print("Check 'data/example_file.txt' and 'processed_data/example_file_processed.txt'")
# Clean up generated files for repeated execution
# os.remove('data/example_file.txt')
# os.remove('processed_data/example_file_processed.txt')