Prefect Dask Integration
Prefect Dask is an integration library that allows Prefect 2.x flows to execute tasks on Dask clusters using the `DaskTaskRunner`. It simplifies the orchestration of Dask workloads within Prefect workflows, supporting both local Dask setups and connections to existing distributed Dask schedulers. The current version is 0.3.6, with updates typically coinciding with Prefect core releases or as needed for Dask compatibility.
Common errors
-
TypeError: DaskTaskRunner() got an unexpected keyword argument 'cluster_class'
cause Attempting to use Prefect 1.x Dask Executor configuration parameters with Prefect 2.x's DaskTaskRunner.fixThe `DaskTaskRunner` for Prefect 2.x does not accept `cluster_class` or similar arguments from Prefect 1.x. Configure Dask clients or clusters directly and pass their `address` to `DaskTaskRunner` or allow it to create a local one. See `DaskTaskRunner` documentation for available parameters. -
ConnectionRefusedError: [Errno 111] Connection refused
cause The DaskTaskRunner tried to connect to a Dask scheduler at a specified address, but no scheduler was found or it refused the connection (e.g., not running, wrong address, firewall).fixVerify that your Dask scheduler is running and accessible at the `address` provided to `DaskTaskRunner`. Check network configurations and firewalls if connecting to a remote scheduler. If intending a local cluster, ensure `address=None` or omit the argument. -
ModuleNotFoundError: No module named 'prefect_dask'
cause The `prefect-dask` library has not been installed in the Python environment.fixInstall the library using `pip install prefect-dask`. Ensure your virtual environment is activated if applicable. -
RuntimeError: Task 'my_task' failed: The Dask client for this task runner could not connect to a Dask scheduler at 'tcp://<ip>:<port>'.
cause The Dask client internal to the DaskTaskRunner failed to establish a connection with the Dask scheduler, possibly due to a race condition during local cluster startup or an invalid remote address.fixIf using a local Dask cluster, ensure sufficient time for startup, or try running the flow again. If connecting to a remote scheduler, double-check the `address` parameter and ensure the scheduler is stable and reachable. You might need to manually start a Dask `Client` and pass it to `DaskTaskRunner` for more control over connection.
Warnings
- breaking Prefect 1.x Dask Executor (`DaskExecutor`) is incompatible with Prefect 2.x and `prefect-dask`. The new approach uses `DaskTaskRunner`.
- gotcha Incorrect or unresolvable Dask scheduler address can lead to unexpected behavior or errors. If `address` is `None` (default), a local Dask cluster is created, which might not be intended for production deployments.
- gotcha Dask and Distributed library version mismatches can cause subtle runtime errors or unexpected behavior within the Dask cluster.
- gotcha Serialization issues with custom objects or complex data structures when passing them to Dask tasks. Dask uses `cloudpickle` by default, but complex types might still cause problems.
Install
-
pip install prefect-dask
Imports
- DaskTaskRunner
from prefect.executors import DaskExecutor
from prefect_dask.runners import DaskTaskRunner
Quickstart
import os
from prefect import flow, task
from prefect_dask.runners import DaskTaskRunner
@task
def my_dask_task(x):
print(f"Executing task on Dask: {x}")
return x * 2
@flow(task_runner=DaskTaskRunner(address=os.environ.get("DASK_SCHEDULER_ADDRESS", None)))
def my_dask_flow(num: int):
"""A Prefect flow that runs tasks on a Dask cluster."""
# If DASK_SCHEDULER_ADDRESS is not set, a local Dask cluster will be created.
# Otherwise, it attempts to connect to the specified Dask scheduler address.
future = my_dask_task.submit(num)
result = future.result()
print(f"Task result: {result}")
return result
if __name__ == "__main__":
# Example 1: Run with a local Dask cluster (default if DASK_SCHEDULER_ADDRESS not set)
print("\n--- Running flow with local Dask cluster ---")
output_local = my_dask_flow(5)
print(f"Flow with local Dask completed, output: {output_local}")
# Example 2: Run by attempting to connect to an external Dask scheduler
# Uncomment and set DASK_SCHEDULER_ADDRESS to test with a remote scheduler
# os.environ["DASK_SCHEDULER_ADDRESS"] = "tcp://127.0.0.1:8786"
# print("\n--- Running flow with external Dask cluster (if configured) ---")
# output_remote = my_dask_flow(10)
# print(f"Flow with external Dask completed, output: {output_remote}")
# Clean up environment variable if set for example
# if "DASK_SCHEDULER_ADDRESS" in os.environ:
# del os.environ["DASK_SCHEDULER_ADDRESS"]