Dask-Jobqueue
Dask-Jobqueue simplifies the deployment of Dask distributed clusters on traditional high-performance computing (HPC) job queuing systems such as PBS, Slurm, LSF, SGE, MOAB, OAR, and HTCondor. It allows users to dynamically launch Dask workers as jobs on a cluster, integrating Dask's parallel computing capabilities with existing HPC infrastructure. The library is actively maintained, with the current version being 0.9.0, and typically follows a regular release cadence aligned with Dask's ecosystem updates. [1, 5, 15, 16]
Common errors
-
Jobs cancelled or fail to start without clear error in Dask logs, or 'slurmstepd: error: *** JOB CANCELLED ***'
cause Incompatible resource requests (cores, memory, walltime) or unrecognized directives in the generated job script by the HPC scheduler. [18, 19]fixInspect the generated job script with `print(cluster.job_script())` and compare it against your system's job submission requirements. Adjust cluster parameters like `job_extra_directives` or `job_directives_skip` as needed, or verify resource limits with your system administrator. [14, 17, 18] -
ModuleNotFoundError on worker nodes, even if the module is available on the client.
cause The Python environment or executable on the worker nodes differs from the one on the client, or necessary modules are not loaded. [14]fixEnsure the `python` parameter in your cluster constructor points to the correct Python executable on the compute nodes. Use `job_script_prologue` to load necessary environment modules (e.g., `['module load my_conda_env']`) before Dask workers start. [14, 18] -
Dask dashboard is not accessible or displays 'connection refused'.
cause Incorrect `dashboard_address` or firewall/network restrictions prevent access to the scheduler's dashboard port. [9]fixSet `scheduler_options={'dashboard_address': ':YOUR_PORT'}` to specify a free port. For remote access, you may need to set up SSH port forwarding (e.g., `ssh -L YOUR_PORT:localhost:YOUR_PORT user@head_node`) if running on an HPC login node. [9, 10] -
Workers are unexpectedly killed by the job queuing system before completing tasks.
cause Worker jobs hit their walltime limit imposed by the HPC scheduler. [17]fixIncrease the `walltime` parameter in your cluster configuration. For long-running or indefinite workloads, consider using the Dask worker options `--lifetime` and `--lifetime-stagger` to gracefully shut down and restart workers before walltime is reached, allowing Dask to rebalance tasks. [17]
Warnings
- breaking The `cluster.start_workers()` and `cluster.stop_all_jobs()` methods have been removed. [2]
- breaking Direct arguments `port` or `dashboard_addresses` in cluster constructors are no longer allowed. [2]
- deprecated Parameters like `project`, `extra`, `env_extra`, `job_extra`, and `header_skip` are deprecated. [10, 11, 17, 20]
- gotcha The default for the `processes` parameter changed from 1 (only threads) to approximately `sqrt(cores)` (processes and threads). This can affect performance or cause memory issues if not anticipated. [2]
- gotcha Memory specifications (e.g., '20GB') may be interpreted differently by Dask and the job scheduler (Gigabytes vs. Gibibytes). [3]
Install
-
pip install dask-jobqueue -
conda install conda-forge::dask-jobqueue
Imports
- PBSCluster
from dask_jobqueue import PBSCluster
- SLURMCluster
from dask_jobqueue import SLURMCluster
- SGECluster
from dask_jobqueue import SGECluster
- LSFCluster
from dask_jobqueue import LSFCluster
- HTCondorCluster
from dask_jobqueue import HTCondorCluster
- OARCluster
from dask_jobqueue import OARCluster
- MoabCluster
from dask_jobqueue import MoabCluster
- Client
from distributed import Client
Quickstart
import os
from dask_jobqueue import SLURMCluster
from distributed import Client
import dask.array as da
# NOTE: Configure these parameters for your specific HPC system
# Use os.environ.get for security if this were a production setup
# Here, hardcoding for quickstart simplicity, but in real scenarios
# these would come from config files or environment variables.
cluster_kwargs = {
'queue': os.environ.get('SLURM_QUEUE', 'debug'), # e.g., 'regular', 'debug'
'account': os.environ.get('SLURM_ACCOUNT', 'my_project_account'),
'cores': 8, # Number of CPU cores per job
'memory': '16GB', # Memory per job
'walltime': '00:30:00', # Wall time for each worker job
'local_directory': os.environ.get('TMPDIR', '/tmp') # Local scratch for workers
}
# Initialize a SLURM cluster
try:
cluster = SLURMCluster(**cluster_kwargs)
# Scale the cluster to request 2 jobs (each with 8 cores, 16GB memory)
cluster.scale(jobs=2)
print(f"Dashboard link: {cluster.dashboard_link}")
# Connect Dask client to the cluster
client = Client(cluster)
print("Dask Client connected.")
# Perform a Dask computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x.mean().compute()
print(f"Computed mean: {y}")
# Clean up resources
client.close()
cluster.close()
print("Cluster and client closed.")
except Exception as e:
print(f"An error occurred: {e}")
if 'cluster' in locals() and cluster is not None:
cluster.close()