{"id":4714,"library":"pypeln","title":"PyPeln: Concurrent Data Pipelines","description":"PyPeln (pronounced \"pypeline\") is a Python library designed for building concurrent data pipelines with ease. It offers a simple, functional API that supports multiprocessing (processes), multithreading (threads), and asynchronous programming (asyncio tasks) with the same interface. This allows developers to create multi-stage pipelines and maintain fine-grained control over computational resources. The library is currently at version 0.4.9 and provides solutions for medium-scale data tasks where frameworks like Spark or Dask might be considered overkill.","status":"active","version":"0.4.9","language":"en","source_language":"en","source_url":"https://github.com/cgarciae/pypeln","tags":["concurrency","asyncio","multiprocessing","threading","pipeline","functional-programming"],"install":[{"cmd":"pip install pypeln","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"Required Python version compatibility.","package":"python","optional":false},{"reason":"Runtime dependency for managing worker lifecycle and timeouts.","package":"stopit","optional":false},{"reason":"Provides backports of typing features for older Python versions, especially important for Python < 3.8.","package":"typing_extensions","optional":false}],"imports":[{"note":"Standard convention for importing the library.","symbol":"pypeln","correct":"import pypeln as pl"},{"note":"While 'pr' was seen in older discussions, 'pl_process' or directly using 'pl.process' is clearer and more idiomatic.","wrong":"from pypeln import pr","symbol":"pypeln.process","correct":"from pypeln import process as pl_process"},{"note":"Similar to 'process', prefer a more descriptive alias or direct access via 'pl.thread'.","wrong":"from pypeln import th","symbol":"pypeln.thread","correct":"from pypeln import thread as pl_thread"},{"note":"Similar to 'process', prefer a more descriptive alias or direct access via 'pl.task'. The alias 'io' was considered but 'task' is the module's name.","wrong":"from pypeln import io","symbol":"pypeln.task","correct":"from pypeln import task as pl_task"}],"quickstart":{"code":"import pypeln as pl\nimport time\nimport random\n\ndef slow_square(x):\n    time.sleep(random.uniform(0.1, 0.5)) # Simulate work\n    return x * x\n\ndef is_even(x):\n    return x % 2 == 0\n\ndef print_item(x):\n    print(f\"Processing: {x}\")\n    return x\n\ndata = range(10) # [0, 1, ..., 9]\n\n# Build a multiprocessing pipeline\n# 1. Map: square each number (4 workers)\n# 2. Filter: keep only even numbers (2 workers)\n# 3. Map: print the item (1 worker, ordered output)\nresults = (pl.process.map(slow_square, data, workers=4, maxsize=4) \n           .filter(is_even, workers=2, maxsize=2) \n           .map(print_item, workers=1, maxsize=1, ordered=True))\n\n# Consume the results (this starts the pipeline execution)\nfinal_list = list(results)\nprint(f\"Final result: {final_list}\")","lang":"python","description":"This quickstart demonstrates a basic multiprocessing pipeline. It defines a source iterable (`data`), then chains `map` and `filter` operations using `pl.process`. The pipeline squares numbers, filters for even ones, and prints each processed item. The `workers` and `maxsize` parameters control concurrency and backpressure for each stage. The `list()` call at the end triggers the execution and collects results."},"warnings":[{"fix":"Review `from_iterable` calls and remove the `maxsize` argument if present. The functionality for `maxsize` was later introduced for `to_stage` and `to_iterable`, and `ordered` functions in 0.4.6.","message":"The `maxsize` argument was removed from all `from_iterable` functions in version 0.4.0.","severity":"breaking","affected_versions":">=0.4.0"},{"fix":"Upgrade to PyPeln version 0.4.2 or newer, which includes conditional dependencies and imports to correctly support Python >= 3.6.2. Ensure `typing_extensions` is installed.","message":"Python 3.6 versions between 0.4.0 and 0.4.1 experienced import errors due to reliance on `typing.Protocol` (introduced in Python 3.8). Although the library generally targets Python 3.6+, the `typing.Protocol` dependency broke compatibility in these specific versions.","severity":"gotcha","affected_versions":"0.4.0 - 0.4.1"},{"fix":"When using `pl.task` for asynchronous operations, ensure your Python environment is 3.7 or newer. For older Python 3.6 environments, use `pl.process` or `pl.thread` instead.","message":"The `task` module, which provides `asyncio` based pipelines, is only available and fully supported for Python versions 3.7 and above, despite the overall library supporting Python >= 3.6.2.","severity":"gotcha","affected_versions":"<3.7 for `task` module"},{"fix":"Upgrade to PyPeln version 0.4.7 or newer to ensure `maxsize` properly functions as a backpressure mechanism, preventing upstream stages from overwhelming downstream ones.","message":"The `maxsize` parameter in `pl.process.map` and `pl.thread.map` was not correctly respected, potentially leading to unbounded queues and out-of-memory issues in earlier versions.","severity":"gotcha","affected_versions":"<0.4.7"},{"fix":"Upgrade to PyPeln version 0.4.9. This issue was specifically addressed in this release.","message":"Process workers using multiprocessing start method 'spawn' could raise an `AttributeError` on certain systems.","severity":"gotcha","affected_versions":"<0.4.9"}],"env_vars":null,"last_verified":"2026-04-12T00:00:00.000Z","next_check":"2026-07-11T00:00:00.000Z"}