{"id":8928,"library":"dask-jobqueue","title":"Dask-Jobqueue","description":"Dask-Jobqueue simplifies the deployment of Dask distributed clusters on traditional high-performance computing (HPC) job queuing systems such as PBS, Slurm, LSF, SGE, MOAB, OAR, and HTCondor. It allows users to dynamically launch Dask workers as jobs on a cluster, integrating Dask's parallel computing capabilities with existing HPC infrastructure. The library is actively maintained, with the current version being 0.9.0, and typically follows a regular release cadence aligned with Dask's ecosystem updates. [1, 5, 15, 16]","status":"active","version":"0.9.0","language":"en","source_language":"en","source_url":"https://github.com/dask/dask-jobqueue","tags":["dask","hpc","cluster","job queue","slurm","pbs","lsf","sge","moab","oar","htcondor","distributed computing"],"install":[{"cmd":"pip install dask-jobqueue","lang":"bash","label":"Using pip"},{"cmd":"conda install conda-forge::dask-jobqueue","lang":"bash","label":"Using Conda"}],"dependencies":[{"reason":"Core Dask library for parallel computing.","package":"dask"},{"reason":"Dask's distributed scheduler and worker components.","package":"distributed"},{"reason":"Common scientific computing dependency, often used with Dask. [12]","package":"numpy","optional":true},{"reason":"Common data manipulation dependency, often used with Dask. [12]","package":"pandas","optional":true}],"imports":[{"symbol":"PBSCluster","correct":"from dask_jobqueue import PBSCluster"},{"symbol":"SLURMCluster","correct":"from dask_jobqueue import SLURMCluster"},{"symbol":"SGECluster","correct":"from dask_jobqueue import SGECluster"},{"symbol":"LSFCluster","correct":"from dask_jobqueue import LSFCluster"},{"symbol":"HTCondorCluster","correct":"from dask_jobqueue import HTCondorCluster"},{"symbol":"OARCluster","correct":"from dask_jobqueue import OARCluster"},{"symbol":"MoabCluster","correct":"from dask_jobqueue import MoabCluster"},{"symbol":"Client","correct":"from distributed import Client"}],"quickstart":{"code":"import os\nfrom dask_jobqueue import SLURMCluster\nfrom distributed import Client\nimport dask.array as da\n\n# NOTE: Configure these parameters for your specific HPC system\n# Use os.environ.get for security if this were a production setup\n# Here, hardcoding for quickstart simplicity, but in real scenarios\n# these would come from config files or environment variables.\ncluster_kwargs = {\n    'queue': os.environ.get('SLURM_QUEUE', 'debug'), # e.g., 'regular', 'debug'\n    'account': os.environ.get('SLURM_ACCOUNT', 'my_project_account'),\n    'cores': 8, # Number of CPU cores per job\n    'memory': '16GB', # Memory per job\n    'walltime': '00:30:00', # Wall time for each worker job\n    'local_directory': os.environ.get('TMPDIR', '/tmp') # Local scratch for workers\n}\n\n# Initialize a SLURM cluster\ntry:\n    cluster = SLURMCluster(**cluster_kwargs)\n    # Scale the cluster to request 2 jobs (each with 8 cores, 16GB memory)\n    cluster.scale(jobs=2)\n    print(f\"Dashboard link: {cluster.dashboard_link}\")\n\n    # Connect Dask client to the cluster\n    client = Client(cluster)\n    print(\"Dask Client connected.\")\n\n    # Perform a Dask computation\n    x = da.random.random((10000, 10000), chunks=(1000, 1000))\n    y = x.mean().compute()\n    print(f\"Computed mean: {y}\")\n\n    # Clean up resources\n    client.close()\n    cluster.close()\n    print(\"Cluster and client closed.\")\nexcept Exception as e:\n    print(f\"An error occurred: {e}\")\n    if 'cluster' in locals() and cluster is not None:\n        cluster.close()","lang":"python","description":"This quickstart demonstrates how to create a Dask SLURMCluster, scale it, connect a Dask client, perform a simple distributed computation, and then shut down the cluster. Users should customize the `cluster_kwargs` dictionary with values appropriate for their specific HPC environment and job scheduler. [4, 9, 10, 16]"},"warnings":[{"fix":"Use `cluster.scale(N)` to request N jobs/workers and `cluster.scale(0)` to stop all jobs/workers. [2]","message":"The `cluster.start_workers()` and `cluster.stop_all_jobs()` methods have been removed. [2]","severity":"breaking","affected_versions":"0.7.0+"},{"fix":"Pass these arguments via the `scheduler_options` dictionary, e.g., `scheduler_options={'dashboard_address': ':12435'}`. [2, 10]","message":"Direct arguments `port` or `dashboard_addresses` in cluster constructors are no longer allowed. [2]","severity":"breaking","affected_versions":"0.7.0+"},{"fix":"Use their replacements: `account` (for `project`), `worker_extra_args` (for `extra`), `job_script_prologue` (for `env_extra`), `job_extra_directives` (for `job_extra`), and `job_directives_skip` (for `header_skip`). [10, 11, 17, 20]","message":"Parameters like `project`, `extra`, `env_extra`, `job_extra`, and `header_skip` are deprecated. [10, 11, 17, 20]","severity":"deprecated","affected_versions":"0.7.4+, 0.8.0+"},{"fix":"Explicitly set `processes` and `threads_per_process` in your cluster constructor if you require a specific worker configuration. For example, `processes=1` for a purely threaded setup, or `processes=cores` for a purely multiprocess setup. [2, 3]","message":"The default for the `processes` parameter changed from 1 (only threads) to approximately `sqrt(cores)` (processes and threads). This can affect performance or cause memory issues if not anticipated. [2]","severity":"gotcha","affected_versions":"0.7.0+"},{"fix":"Always use 'GiB' (Gibibytes) in Dask-Jobqueue memory configuration (e.g., `memory='20GiB'`) to ensure consistent requests across Dask and the scheduler. [3]","message":"Memory specifications (e.g., '20GB') may be interpreted differently by Dask and the job scheduler (Gigabytes vs. Gibibytes). [3]","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Inspect the generated job script with `print(cluster.job_script())` and compare it against your system's job submission requirements. Adjust cluster parameters like `job_extra_directives` or `job_directives_skip` as needed, or verify resource limits with your system administrator. [14, 17, 18]","cause":"Incompatible resource requests (cores, memory, walltime) or unrecognized directives in the generated job script by the HPC scheduler. [18, 19]","error":"Jobs cancelled or fail to start without clear error in Dask logs, or 'slurmstepd: error: *** JOB CANCELLED ***'"},{"fix":"Ensure the `python` parameter in your cluster constructor points to the correct Python executable on the compute nodes. Use `job_script_prologue` to load necessary environment modules (e.g., `['module load my_conda_env']`) before Dask workers start. [14, 18]","cause":"The Python environment or executable on the worker nodes differs from the one on the client, or necessary modules are not loaded. [14]","error":"ModuleNotFoundError on worker nodes, even if the module is available on the client."},{"fix":"Set `scheduler_options={'dashboard_address': ':YOUR_PORT'}` to specify a free port. For remote access, you may need to set up SSH port forwarding (e.g., `ssh -L YOUR_PORT:localhost:YOUR_PORT user@head_node`) if running on an HPC login node. [9, 10]","cause":"Incorrect `dashboard_address` or firewall/network restrictions prevent access to the scheduler's dashboard port. [9]","error":"Dask dashboard is not accessible or displays 'connection refused'."},{"fix":"Increase the `walltime` parameter in your cluster configuration. For long-running or indefinite workloads, consider using the Dask worker options `--lifetime` and `--lifetime-stagger` to gracefully shut down and restart workers before walltime is reached, allowing Dask to rebalance tasks. [17]","cause":"Worker jobs hit their walltime limit imposed by the HPC scheduler. [17]","error":"Workers are unexpectedly killed by the job queuing system before completing tasks."}]}