Pypiper

0.15.1 · active · verified Fri Apr 17

Pypiper is a lightweight Python toolkit designed for building robust, restartable command-line pipelines. It simplifies the process of creating complex data processing workflows by handling logging, error recovery, and status tracking. The current version is 0.15.1, and it maintains an active release cadence with regular updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a `PipelineManager`, define stages using `pm.run()` with shell commands, specify target files for restartability, and report a simple result. It creates an output directory, generates a file, processes it, and then reports a simple metric.

import pypiper
import os

# Define pipeline name and output directory
PIPELINE_NAME = "my_pypiper_example"
OUTDIR = "pypiper_output"
os.makedirs(OUTDIR, exist_ok=True)

# Initialize PipelineManager
pm = pypiper.PipelineManager(name=PIPELINE_NAME, outdir=OUTDIR)

print(f"\n--- Starting Pypiper Pipeline: {PIPELINE_NAME} ---")

# Stage 1: Create an initial file
input_file = os.path.join(OUTDIR, "raw_data.txt")
cmd1 = f"echo 'Line 1\nLine 2\nLine 3' > {input_file}"
pm.run(cmd1, target=input_file, stage_name="create_raw_data")

# Stage 2: Process the file (e.g., count lines)
output_file = os.path.join(OUTDIR, "processed_data.txt")
cmd2 = f"wc -l {input_file} > {output_file}"
pm.run(cmd2, target=output_file, stage_name="count_lines")

# Report a result to pipestat (requires pipestat to be configured or just report to log)
pm.report_result("lines_counted", os.path.getsize(output_file))

# Close the pipeline manager (flushes logs, finishes reporting)
pm.close()

print(f"--- Pipeline Finished! Check '{OUTDIR}' for results. ---")
print(f"Content of {output_file}:")
with open(output_file, 'r') as f:
    print(f.read().strip())

# Clean up (optional for quickstart demonstration)
# import shutil
# shutil.rmtree(OUTDIR)

view raw JSON →