Prefect-Ray
Prefect-Ray provides integrations for the Prefect workflow orchestration framework with the Ray distributed execution framework. It enables Prefect flows to run tasks in parallel using Ray, either by creating a temporary local Ray instance or connecting to an existing remote one. The library is actively maintained and currently at version 0.4.5, with its own versioning separate from the main Prefect library.
Common errors
-
ModuleNotFoundError: No module named 'prefect_ray'
cause The `prefect-ray` library is not installed or accessible in the Python environment of the Ray workers or head node when connecting to a remote Ray cluster.fixEnsure `prefect-ray` is installed on all Ray cluster nodes. If using `RayTaskRunner(address=...)`, pass `init_kwargs={'runtime_env': {'pip': ['prefect-ray']}}` to ensure workers have it. -
RuntimeError: Failed to reach API at http://127.0.0.1:4200/api/
cause Ray tasks, especially on a remote cluster, are attempting to connect to the default local Prefect API URL instead of the configured remote one, likely due to environment variable propagation issues.fixSet `PREFECT_API_URL` explicitly in the `runtime_env` for the `RayTaskRunner`'s `init_kwargs`. For example: `RayTaskRunner(address="ray://your-remote-ray-head:10001", init_kwargs={"runtime_env": {"env_vars": {"PREFECT_API_URL": "http://your-prefect-api-url:4200/api"}}})`. -
RuntimeError: There is no current event loop in thread 'ray_client_server_1'
cause This error can occur in specific scenarios involving Ray Client and how event loops are managed, particularly when resources are specified for tasks or there are concurrency conflicts.fixThis often points to a complex interaction between Ray and Prefect's async execution. Ensure compatible versions of Ray and Prefect. Simplifying resource specifications or re-evaluating the task execution model might be necessary. Check Prefect and Ray GitHub issues for similar reports and workarounds. -
Local server randomly not working with `prefect-ray`
cause Intermittent issues with the Prefect local server becoming unreachable when `prefect-ray` is installed, sometimes after OS upgrades or reinstallation attempts. Often related to corrupted local Prefect configuration or caching.fixTry clearing the Prefect data directory (`~/.prefect`), uninstalling all Prefect-related packages (`prefect`, `prefect-ray`, etc.), and reinstalling them. Restarting the system can also help.
Warnings
- breaking Python 3.13 is not supported due to Ray's experimental support and Prefect's current lack of support. Ensure your environment uses Python 3.9 through 3.12.
- gotcha Users on Apple Silicon (M1/ARM processors) may experience issues with `ray` installation via `pip`. Manual installation of blocking components (like `grpcio`) using `conda` is often required.
- gotcha When using `RayTaskRunner` with a remote Ray cluster (i.e., specifying the `address` argument), dependencies like `prefect-ray` and any packages used in your tasks must be installed on the Ray head node and workers. `runtime_env` might be needed for worker-specific dependencies but does not cover the driver's imports.
- gotcha Prefect's default SQLite database is not suitable for concurrent access when using distributed task runners like `RayTaskRunner`. This can lead to database corruption or unexpected behavior.
- gotcha Environment variables, such as `PREFECT_API_URL`, configured in the Prefect deployment or execution environment might not propagate correctly to Ray tasks when they run on a remote Ray cluster via `RayTaskRunner`. This can cause tasks to fail connecting to the Prefect API.
Install
-
pip install "prefect[ray]" -
pip install prefect-ray
Imports
- RayTaskRunner
from prefect_ray import RayTaskRunner
from prefect_ray.task_runners import RayTaskRunner
Quickstart
import time
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
@task
def shout(number: int):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=RayTaskRunner())
def count_to(highest_number: int):
# Tasks are submitted to Ray for parallel execution
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
# Example of connecting to an existing Ray instance:
# count_to(10, task_runner=RayTaskRunner(address="ray://localhost:10001"))