{"id":8440,"library":"prefect-dask","title":"Prefect Dask Integration","description":"Prefect Dask is an integration library that allows Prefect 2.x flows to execute tasks on Dask clusters using the `DaskTaskRunner`. It simplifies the orchestration of Dask workloads within Prefect workflows, supporting both local Dask setups and connections to existing distributed Dask schedulers. The current version is 0.3.6, with updates typically coinciding with Prefect core releases or as needed for Dask compatibility.","status":"active","version":"0.3.6","language":"en","source_language":"en","source_url":"https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-dask","tags":["prefect","dask","workflow","orchestration","executor","distributed-computing"],"install":[{"cmd":"pip install prefect-dask","lang":"bash","label":"Install prefect-dask"}],"dependencies":[{"reason":"Core Prefect orchestration library (version 2.x required).","package":"prefect","optional":false},{"reason":"Distributed computing framework.","package":"dask","optional":false},{"reason":"Dask's distributed scheduler and workers library.","package":"distributed","optional":false}],"imports":[{"note":"DaskExecutor was for Prefect 1.x. For Prefect 2.x and prefect-dask, use DaskTaskRunner.","wrong":"from prefect.executors import DaskExecutor","symbol":"DaskTaskRunner","correct":"from prefect_dask.runners import DaskTaskRunner"}],"quickstart":{"code":"import os\nfrom prefect import flow, task\nfrom prefect_dask.runners import DaskTaskRunner\n\n@task\ndef my_dask_task(x):\n    print(f\"Executing task on Dask: {x}\")\n    return x * 2\n\n@flow(task_runner=DaskTaskRunner(address=os.environ.get(\"DASK_SCHEDULER_ADDRESS\", None)))\ndef my_dask_flow(num: int):\n    \"\"\"A Prefect flow that runs tasks on a Dask cluster.\"\"\"\n    # If DASK_SCHEDULER_ADDRESS is not set, a local Dask cluster will be created.\n    # Otherwise, it attempts to connect to the specified Dask scheduler address.\n    \n    future = my_dask_task.submit(num)\n    result = future.result()\n    print(f\"Task result: {result}\")\n    return result\n\nif __name__ == \"__main__\":\n    # Example 1: Run with a local Dask cluster (default if DASK_SCHEDULER_ADDRESS not set)\n    print(\"\\n--- Running flow with local Dask cluster ---\")\n    output_local = my_dask_flow(5)\n    print(f\"Flow with local Dask completed, output: {output_local}\")\n\n    # Example 2: Run by attempting to connect to an external Dask scheduler\n    # Uncomment and set DASK_SCHEDULER_ADDRESS to test with a remote scheduler\n    # os.environ[\"DASK_SCHEDULER_ADDRESS\"] = \"tcp://127.0.0.1:8786\"\n    # print(\"\\n--- Running flow with external Dask cluster (if configured) ---\")\n    # output_remote = my_dask_flow(10)\n    # print(f\"Flow with external Dask completed, output: {output_remote}\")\n\n    # Clean up environment variable if set for example\n    # if \"DASK_SCHEDULER_ADDRESS\" in os.environ:\n    #     del os.environ[\"DASK_SCHEDULER_ADDRESS\"]\n","lang":"python","description":"This quickstart defines a simple Prefect flow that uses `DaskTaskRunner` to execute tasks. If the `DASK_SCHEDULER_ADDRESS` environment variable is not set, it automatically spins up a local Dask cluster. Otherwise, it attempts to connect to the specified Dask scheduler address, demonstrating how to use Dask locally or with an existing distributed setup."},"warnings":[{"fix":"Rewrite flows to use `DaskTaskRunner` as the `task_runner` for your flow, replacing `DaskExecutor` usage. Consult Prefect 2.x migration guides.","message":"Prefect 1.x Dask Executor (`DaskExecutor`) is incompatible with Prefect 2.x and `prefect-dask`. The new approach uses `DaskTaskRunner`.","severity":"breaking","affected_versions":"Prefect 1.x users migrating to Prefect 2.x"},{"fix":"Always explicitly configure the `address` for `DaskTaskRunner` when connecting to an existing distributed Dask cluster. Ensure the Dask scheduler is running and accessible at the specified address. For local development, `address=None` is often sufficient.","message":"Incorrect or unresolvable Dask scheduler address can lead to unexpected behavior or errors. If `address` is `None` (default), a local Dask cluster is created, which might not be intended for production deployments.","severity":"gotcha","affected_versions":"All versions of prefect-dask"},{"fix":"Ensure `dask` and `distributed` are installed with compatible versions. Check the `prefect-dask` documentation or Dask documentation for recommended version pairings. It's often best to install them together or upgrade them simultaneously.","message":"Dask and Distributed library version mismatches can cause subtle runtime errors or unexpected behavior within the Dask cluster.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure custom classes are pickleable. If passing large data, consider using Dask's distributed collections (e.g., Dask DataFrames, Bags) or storing data in a shared, accessible location (e.g., S3, local disk) and passing paths instead of raw objects.","message":"Serialization issues with custom objects or complex data structures when passing them to Dask tasks. Dask uses `cloudpickle` by default, but complex types might still cause problems.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"The `DaskTaskRunner` for Prefect 2.x does not accept `cluster_class` or similar arguments from Prefect 1.x. Configure Dask clients or clusters directly and pass their `address` to `DaskTaskRunner` or allow it to create a local one. See `DaskTaskRunner` documentation for available parameters.","cause":"Attempting to use Prefect 1.x Dask Executor configuration parameters with Prefect 2.x's DaskTaskRunner.","error":"TypeError: DaskTaskRunner() got an unexpected keyword argument 'cluster_class'"},{"fix":"Verify that your Dask scheduler is running and accessible at the `address` provided to `DaskTaskRunner`. Check network configurations and firewalls if connecting to a remote scheduler. If intending a local cluster, ensure `address=None` or omit the argument.","cause":"The DaskTaskRunner tried to connect to a Dask scheduler at a specified address, but no scheduler was found or it refused the connection (e.g., not running, wrong address, firewall).","error":"ConnectionRefusedError: [Errno 111] Connection refused"},{"fix":"Install the library using `pip install prefect-dask`. Ensure your virtual environment is activated if applicable.","cause":"The `prefect-dask` library has not been installed in the Python environment.","error":"ModuleNotFoundError: No module named 'prefect_dask'"},{"fix":"If using a local Dask cluster, ensure sufficient time for startup, or try running the flow again. If connecting to a remote scheduler, double-check the `address` parameter and ensure the scheduler is stable and reachable. You might need to manually start a Dask `Client` and pass it to `DaskTaskRunner` for more control over connection.","cause":"The Dask client internal to the DaskTaskRunner failed to establish a connection with the Dask scheduler, possibly due to a race condition during local cluster startup or an invalid remote address.","error":"RuntimeError: Task 'my_task' failed: The Dask client for this task runner could not connect to a Dask scheduler at 'tcp://<ip>:<port>'."}]}