b2luigi - Belle II Luigi Extensions

1.2.8 · active · verified Sun Apr 12

b2luigi extends the Luigi workflow management system, primarily integrating it with the Belle II software framework (basf2) for batch processing tasks. It provides specialized tasks, runners, and target classes for managing data within the Belle II environment and various remote file systems. The library is actively maintained with frequent releases, typically every few months, addressing bug fixes and adding new functionalities like WebDAV support.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define and run a simple workflow using `b2luigi.Task` and `b2luigi.LocalTarget`. It involves two tasks: one to generate a data file and another to process it, showcasing task dependencies and ensuring output directories are created.

import b2luigi
import luigi
import os

class GenerateData(b2luigi.Task):
    filename = luigi.Parameter()

    def output(self):
        return b2luigi.LocalTarget(f'data/{self.filename}.txt')

    def run(self):
        os.makedirs(os.path.dirname(self.output().path), exist_ok=True)
        with self.output().open('w') as f:
            f.write(f'Generated data for {self.filename}')

class ProcessData(b2luigi.Task):
    filename = luigi.Parameter()

    def requires(self):
        return GenerateData(filename=self.filename)

    def output(self):
        return b2luigi.LocalTarget(f'processed_data/{self.filename}_processed.txt')

    def run(self):
        os.makedirs(os.path.dirname(self.output().path), exist_ok=True)
        with self.input().open('r') as infile,
             self.output().open('w') as outfile:
            content = infile.read()
            outfile.write(f'Processed: {content.upper()}')

if __name__ == '__main__':
    # Use luigi.build for programmatic execution within a script.
    # For command-line execution and parsing, b2luigi.run() is typically used.
    luigi.build([
        ProcessData(filename='example_file')
    ], local_scheduler=True)

    print("\n--- Task completed ---")
    print("Check 'data/example_file.txt' and 'processed_data/example_file_processed.txt'")
    # Clean up generated files for repeated execution
    # os.remove('data/example_file.txt')
    # os.remove('processed_data/example_file_processed.txt')

view raw JSON →