Dask Distributed
Distributed is the scheduler for Dask, providing a distributed computation engine for parallel and out-of-core analytics. It manages computations across a cluster of machines, featuring dynamic task scheduling, fault tolerance, and diagnostics. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly schedule, following a `YYYY.MM.patch` versioning scheme. The current version is 2026.3.0.
Warnings
- breaking Starting with version 2026.1.2, `distributed` requires `pyarrow>=16`. Older versions of `pyarrow` may lead to `ImportError` or serialization issues, especially with Dask DataFrames.
- breaking `distributed` now requires Python 3.10 or newer. Support for Python 3.9 and older has been dropped.
- breaking Starting with version 2025.12.0, `distributed` requires `toolz>=0.12.0`. Older versions may cause compatibility errors.
- gotcha The Dask diagnostics dashboard requires `bokeh` to be installed. Without it, the dashboard link will still be provided but will not function correctly, often leading to a blank page or errors.
- gotcha When using `Client` or `LocalCluster`, always use them as context managers (`with Client(...) as client:`) or explicitly call their `.close()` methods to ensure proper cleanup of resources, especially in scripts or long-running applications. Failing to do so can leave orphaned processes or open ports.
Install
-
pip install "distributed[complete]" -
pip install distributed
Imports
- Client
from distributed import Client
- LocalCluster
from distributed import LocalCluster
- fire_and_forget
from distributed import fire_and_forget
Quickstart
from distributed import Client, LocalCluster
import time
def inc(x):
time.sleep(0.1) # Simulate work
return x + 1
# Start a local Dask cluster
# Using 'with' statement ensures proper cleanup
with LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=':8787') as cluster:
print(f"Dashboard available at: {cluster.dashboard_link}")
# Connect a client to the cluster
with Client(cluster) as client:
print(f"Client connected to: {client.scheduler.address}")
# Submit tasks
futures = client.map(inc, range(10))
# Gather results
results = client.gather(futures)
print(f"Results: {results}")
assert results == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print("Client disconnected.")
print("LocalCluster closed.")