Dask-Jobqueue

0.9.0 · active · verified Thu Apr 16

Dask-Jobqueue simplifies the deployment of Dask distributed clusters on traditional high-performance computing (HPC) job queuing systems such as PBS, Slurm, LSF, SGE, MOAB, OAR, and HTCondor. It allows users to dynamically launch Dask workers as jobs on a cluster, integrating Dask's parallel computing capabilities with existing HPC infrastructure. The library is actively maintained, with the current version being 0.9.0, and typically follows a regular release cadence aligned with Dask's ecosystem updates. [1, 5, 15, 16]

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create a Dask SLURMCluster, scale it, connect a Dask client, perform a simple distributed computation, and then shut down the cluster. Users should customize the `cluster_kwargs` dictionary with values appropriate for their specific HPC environment and job scheduler. [4, 9, 10, 16]

import os
from dask_jobqueue import SLURMCluster
from distributed import Client
import dask.array as da

# NOTE: Configure these parameters for your specific HPC system
# Use os.environ.get for security if this were a production setup
# Here, hardcoding for quickstart simplicity, but in real scenarios
# these would come from config files or environment variables.
cluster_kwargs = {
    'queue': os.environ.get('SLURM_QUEUE', 'debug'), # e.g., 'regular', 'debug'
    'account': os.environ.get('SLURM_ACCOUNT', 'my_project_account'),
    'cores': 8, # Number of CPU cores per job
    'memory': '16GB', # Memory per job
    'walltime': '00:30:00', # Wall time for each worker job
    'local_directory': os.environ.get('TMPDIR', '/tmp') # Local scratch for workers
}

# Initialize a SLURM cluster
try:
    cluster = SLURMCluster(**cluster_kwargs)
    # Scale the cluster to request 2 jobs (each with 8 cores, 16GB memory)
    cluster.scale(jobs=2)
    print(f"Dashboard link: {cluster.dashboard_link}")

    # Connect Dask client to the cluster
    client = Client(cluster)
    print("Dask Client connected.")

    # Perform a Dask computation
    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    y = x.mean().compute()
    print(f"Computed mean: {y}")

    # Clean up resources
    client.close()
    cluster.close()
    print("Cluster and client closed.")
except Exception as e:
    print(f"An error occurred: {e}")
    if 'cluster' in locals() and cluster is not None:
        cluster.close()

view raw JSON →