PyPeln: Concurrent Data Pipelines
PyPeln (pronounced "pypeline") is a Python library designed for building concurrent data pipelines with ease. It offers a simple, functional API that supports multiprocessing (processes), multithreading (threads), and asynchronous programming (asyncio tasks) with the same interface. This allows developers to create multi-stage pipelines and maintain fine-grained control over computational resources. The library is currently at version 0.4.9 and provides solutions for medium-scale data tasks where frameworks like Spark or Dask might be considered overkill.
Warnings
- breaking The `maxsize` argument was removed from all `from_iterable` functions in version 0.4.0.
- gotcha Python 3.6 versions between 0.4.0 and 0.4.1 experienced import errors due to reliance on `typing.Protocol` (introduced in Python 3.8). Although the library generally targets Python 3.6+, the `typing.Protocol` dependency broke compatibility in these specific versions.
- gotcha The `task` module, which provides `asyncio` based pipelines, is only available and fully supported for Python versions 3.7 and above, despite the overall library supporting Python >= 3.6.2.
- gotcha The `maxsize` parameter in `pl.process.map` and `pl.thread.map` was not correctly respected, potentially leading to unbounded queues and out-of-memory issues in earlier versions.
- gotcha Process workers using multiprocessing start method 'spawn' could raise an `AttributeError` on certain systems.
Install
-
pip install pypeln
Imports
- pypeln
import pypeln as pl
- pypeln.process
from pypeln import process as pl_process
- pypeln.thread
from pypeln import thread as pl_thread
- pypeln.task
from pypeln import task as pl_task
Quickstart
import pypeln as pl
import time
import random
def slow_square(x):
time.sleep(random.uniform(0.1, 0.5)) # Simulate work
return x * x
def is_even(x):
return x % 2 == 0
def print_item(x):
print(f"Processing: {x}")
return x
data = range(10) # [0, 1, ..., 9]
# Build a multiprocessing pipeline
# 1. Map: square each number (4 workers)
# 2. Filter: keep only even numbers (2 workers)
# 3. Map: print the item (1 worker, ordered output)
results = (pl.process.map(slow_square, data, workers=4, maxsize=4)
.filter(is_even, workers=2, maxsize=2)
.map(print_item, workers=1, maxsize=1, ordered=True))
# Consume the results (this starts the pipeline execution)
final_list = list(results)
print(f"Final result: {final_list}")