PyPeln: Concurrent Data Pipelines

0.4.9 · active · verified Sun Apr 12

PyPeln (pronounced "pypeline") is a Python library designed for building concurrent data pipelines with ease. It offers a simple, functional API that supports multiprocessing (processes), multithreading (threads), and asynchronous programming (asyncio tasks) with the same interface. This allows developers to create multi-stage pipelines and maintain fine-grained control over computational resources. The library is currently at version 0.4.9 and provides solutions for medium-scale data tasks where frameworks like Spark or Dask might be considered overkill.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic multiprocessing pipeline. It defines a source iterable (`data`), then chains `map` and `filter` operations using `pl.process`. The pipeline squares numbers, filters for even ones, and prints each processed item. The `workers` and `maxsize` parameters control concurrency and backpressure for each stage. The `list()` call at the end triggers the execution and collects results.

import pypeln as pl
import time
import random

def slow_square(x):
    time.sleep(random.uniform(0.1, 0.5)) # Simulate work
    return x * x

def is_even(x):
    return x % 2 == 0

def print_item(x):
    print(f"Processing: {x}")
    return x

data = range(10) # [0, 1, ..., 9]

# Build a multiprocessing pipeline
# 1. Map: square each number (4 workers)
# 2. Filter: keep only even numbers (2 workers)
# 3. Map: print the item (1 worker, ordered output)
results = (pl.process.map(slow_square, data, workers=4, maxsize=4) 
           .filter(is_even, workers=2, maxsize=2) 
           .map(print_item, workers=1, maxsize=1, ordered=True))

# Consume the results (this starts the pipeline execution)
final_list = list(results)
print(f"Final result: {final_list}")

view raw JSON →