Prefect Dask Integration

0.3.6 · active · verified Thu Apr 16

Prefect Dask is an integration library that allows Prefect 2.x flows to execute tasks on Dask clusters using the `DaskTaskRunner`. It simplifies the orchestration of Dask workloads within Prefect workflows, supporting both local Dask setups and connections to existing distributed Dask schedulers. The current version is 0.3.6, with updates typically coinciding with Prefect core releases or as needed for Dask compatibility.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart defines a simple Prefect flow that uses `DaskTaskRunner` to execute tasks. If the `DASK_SCHEDULER_ADDRESS` environment variable is not set, it automatically spins up a local Dask cluster. Otherwise, it attempts to connect to the specified Dask scheduler address, demonstrating how to use Dask locally or with an existing distributed setup.

import os
from prefect import flow, task
from prefect_dask.runners import DaskTaskRunner

@task
def my_dask_task(x):
    print(f"Executing task on Dask: {x}")
    return x * 2

@flow(task_runner=DaskTaskRunner(address=os.environ.get("DASK_SCHEDULER_ADDRESS", None)))
def my_dask_flow(num: int):
    """A Prefect flow that runs tasks on a Dask cluster."""
    # If DASK_SCHEDULER_ADDRESS is not set, a local Dask cluster will be created.
    # Otherwise, it attempts to connect to the specified Dask scheduler address.
    
    future = my_dask_task.submit(num)
    result = future.result()
    print(f"Task result: {result}")
    return result

if __name__ == "__main__":
    # Example 1: Run with a local Dask cluster (default if DASK_SCHEDULER_ADDRESS not set)
    print("\n--- Running flow with local Dask cluster ---")
    output_local = my_dask_flow(5)
    print(f"Flow with local Dask completed, output: {output_local}")

    # Example 2: Run by attempting to connect to an external Dask scheduler
    # Uncomment and set DASK_SCHEDULER_ADDRESS to test with a remote scheduler
    # os.environ["DASK_SCHEDULER_ADDRESS"] = "tcp://127.0.0.1:8786"
    # print("\n--- Running flow with external Dask cluster (if configured) ---")
    # output_remote = my_dask_flow(10)
    # print(f"Flow with external Dask completed, output: {output_remote}")

    # Clean up environment variable if set for example
    # if "DASK_SCHEDULER_ADDRESS" in os.environ:
    #     del os.environ["DASK_SCHEDULER_ADDRESS"]

view raw JSON →