Module OPTIMA.hardware_configs.common

Expand source code
from typing import Optional, Union

import os
import subprocess
from abc import ABC, abstractmethod


class Cluster(ABC):
    type = None

    cpus_per_node = None
    gpus_per_node = None
    mem_per_cpu = None
    threads_per_cpu_core = None  # number of threads per cpu core (SMT)
    SMT_included_in_reservation = None  # if requesting N cores, do we get N logical (SMT_included_in_reservation=True) or N physical cores (SMT_included_in_reservation=False)

    # str; path to pickle file containing a list of type [[job_id, head_node_name, instance_num], [...]] marking which
    # node already runs a Ray head node and which instance_num is used for it; if it does not exist, it will be created
    # by helpers/manage_ray_nodes.py
    # Importantly, this file needs to be writable from the worker nodes!
    ray_headnodes_path = None
    ray_temp_path = "/tmp/ray"  # path where Ray's temporary session files can be saved to; this should preferably be a local directory on each worker node.

    def submit_job(self, job: "ClusterJob", job_file_path: str, dry_run: bool = False) -> None:
        """Writes the job file for the provided job and submits it to the batch system."""
        if os.path.dirname(job_file_path) != "" and not os.path.exists(os.path.dirname(job_file_path)):
            os.makedirs(os.path.dirname(job_file_path))
        with open(job_file_path, "w") as f:
            f.write(self._write_job_config(job) + "\n")
            f.write(job.payload)

        if not dry_run:
            self._submit(job_file_path)

    @abstractmethod
    def get_job_nodes_list(self) -> list:
        """Returns a list of nodes for the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_job_nodes_list_bash(self) -> str:
        """Returns a string containing a bash command to fetch the list of node names for the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_node_ip_bash(self, node: str) -> str:
        """Returns a string containing a bash command to fetch the ip address of the provided node."""
        raise NotImplementedError

    @abstractmethod
    def get_job_id(self) -> str:
        """Returns a string containing the id of the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_job_id_bash(self) -> str:
        """Returns a string containing a bash command to fetch the id of the current job."""
        raise NotImplementedError

    @abstractmethod
    def check_job_running(self, job_id) -> bool:
        """Checks if a job for a given job_id is still running."""
        raise NotImplementedError

    @abstractmethod
    def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) -> dict[str, int]:
        """Returns a dict of type {'node_name': num_cpus, ...} containing the number of cpus reserved on each node in the job."""
        raise NotImplementedError

    @abstractmethod
    def get_ports(self) -> dict[str, tuple[int, int]]:
        """
        Returns a dict of port numbers that can be used for the communication between the Ray nodes.

        See https://docs.ray.io/en/releases-2.5.1/ray-core/configure.html#ports-configurations for explanations of the
        available ports.

        The keys must be:

        - `'port'`
        - `'node_manager_port'`
        - object_manager_port
        - ray_client_server_port
        - redis_shard_ports
        - min_worker_port
        - max_worker_port

        In order to allow multiple parallel Ray clusters, the corresponding ports must be different for each job. Thus,
        instead of only a single port number, each value must be a tuple of two integers. The first value is the port
        to be used for the first job, and the second number if the offset that is added for each additional job. For example,
        a value of `(6700, 10)` would mean that the first job uses port 6700, the second job port 6710, etc.
        """
        raise NotImplementedError

    def start_ray_node(
            self,
            node: str,
            head_ip: str,
            num_cpus: Union[int, str],
            num_gpus: Union[int, str],
            head: bool = False,
            port: Optional[Union[int, str]] = None,
            node_manager_port: Optional[Union[int, str]] = None,
            object_manager_port: Optional[Union[int, str]] = None,
            ray_client_server_port: Optional[Union[int, str]] = None,
            redis_shard_ports: Optional[Union[int, str]] = None,
            min_worker_port: Optional[Union[int, str]] = None,
            max_worker_port: Optional[Union[int, str]] = None,
    ) -> str:
        """Returns a string containing a bash command to start a ray node on the provided node."""
        if head:
            cmd = f" \\\n\tray start --head \\\n"
            cmd += f"\t\t--node-ip-address=\"{head_ip}\" \\\n"
            cmd += f"\t\t--port={port} \\\n"
            cmd += f"\t\t--node-manager-port={node_manager_port} \\\n"
            cmd += f"\t\t--object-manager-port={object_manager_port} \\\n"
            cmd += f"\t\t--ray-client-server-port={ray_client_server_port} \\\n"
            cmd += f"\t\t--redis-shard-ports={redis_shard_ports} \\\n"
            cmd += f"\t\t--min-worker-port={min_worker_port} \\\n"
            cmd += f"\t\t--max-worker-port={max_worker_port} \\\n"
            cmd += f"\t\t--num-cpus {num_cpus} \\\n"
            cmd += f"\t\t--num-gpus {num_gpus} \\\n"
            cmd += f"\t\t--temp-dir {self.ray_temp_path} \\\n"
            cmd += f"\t\t--include-dashboard false \\\n"
            cmd += f"\t\t--block --disable-usage-stats &"
        else:
            cmd = f" \\\n\tray start \\\n"
            cmd += f"\t\t--address {head_ip} \\\n"
            cmd += f"\t\t--num-cpus {num_cpus} \\\n"
            cmd += f"\t\t--num-gpus {num_gpus} \\\n"
            cmd += f"\t\t--block --disable-usage-stats &"

        cmd = self._execute_single_cmd(cmd, num_cpus, node)
        return cmd

    @abstractmethod
    def _write_job_config(self, job: "ClusterJob") -> str:
        """Helper function that returns the header of the job file defining the job config, i.e. number of cpus etc."""
        raise NotImplementedError

    @abstractmethod
    def _submit(self, job_file_path: str) -> None:
        """Helper function that submits a job to the batch system."""
        raise NotImplementedError

    @abstractmethod
    def _execute_single_cmd(self, cmd: str, cpus: Optional[int] = None, node: Optional[str] = None) -> str:
        """Helper function that returns a bash command that executes a single command on a specific node of the cluster."""
        raise NotImplementedError


class ClusterJob(ABC):
    def __init__(
            self,
            name: Optional[str] = None,
            cpus: Optional[int] = None,
            gpus: Optional[int] = None,
            nodes: Optional[int] = None,
            tasks: Optional[int] = None,
            tasks_per_node: Optional[int] = None,
            gpus_per_node: Optional[int] = None,
            cpus_per_task: Optional[int] = None,
            mem_per_cpu: Optional[Union[int, float]] = None,
            use_SMT: Optional[bool] = None,
            runtime: Optional[Union[int, float]] = None,
            log_path_out: Optional[str] = None,
            log_path_error: Optional[str] = None,
            payload: Optional[str] = None,
    ):
        self.name = name
        self.cpus = cpus
        self.gpus = gpus
        self.nodes = nodes
        self.tasks = tasks
        self.tasks_per_node = tasks_per_node
        self.gpus_per_node = gpus_per_node
        self.cpus_per_task = cpus_per_task
        self.mem_per_cpu = mem_per_cpu
        self.use_SMT = use_SMT
        self.runtime = runtime
        self.log_path_out = log_path_out
        self.log_path_error = log_path_error
        self.payload = payload

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        self._name = value

    @property
    def cpus(self):
        return self._cpus

    @cpus.setter
    def cpus(self, value):
        self._cpus = value

    @property
    def gpus(self):
        return self._gpus

    @gpus.setter
    def gpus(self, value):
        self._gpus = value

    @property
    def nodes(self):
        return self._nodes

    @nodes.setter
    def nodes(self, value):
        self._nodes = value

    @property
    def tasks(self):
        return self._tasks

    @tasks.setter
    def tasks(self, value):
        self._tasks = value

    @property
    def tasks_per_node(self):
        return self._tasks_per_node

    @tasks_per_node.setter
    def tasks_per_node(self, value):
        self._tasks_per_node = value

    @property
    def gpus_per_node(self):
        return self._gpus_per_node

    @gpus_per_node.setter
    def gpus_per_node(self, value):
        self._gpus_per_node = value

    @property
    def cpus_per_task(self):
        return self._cpus_per_task

    @cpus_per_task.setter
    def cpus_per_task(self, value):
        self._cpus_per_task = value

    @property
    def mem_per_cpu(self):
        return self._mem_per_cpu

    @mem_per_cpu.setter
    def mem_per_cpu(self, value):
        self._mem_per_cpu = value

    @property
    def use_SMT(self):
        return self._use_SMT

    @use_SMT.setter
    def use_SMT(self, value):
        self._use_SMT = value

    @property
    def runtime(self):
        return self._runtime

    @runtime.setter
    def runtime(self, value):
        self._runtime = value

    @property
    def log_path_out(self):
        return self._log_path_out

    @log_path_out.setter
    def log_path_out(self, value):
        self._log_path_out = value

    @property
    def log_path_error(self):
        return self._log_path_error

    @log_path_error.setter
    def log_path_error(self, value):
        self._log_path_error = value

    @property
    def payload(self):
        return self._payload

    @payload.setter
    def payload(self, value):
        self._payload = value


class SLURMCluster(Cluster):
    type = "SLURM"

    def get_job_nodes_list(self) -> list:
        # get the list of nodes
        nodes = os.environ["SLURM_JOB_NODELIST"]

        # expand the possible list of nodes that may be present here, e. g. taurusi[7041-7042,7045-7046]
        if "[" in nodes:  # assuming sensible node names
            nodes_range = nodes.split("[")[1].split("]")[0]
            nodes_list_int = self._rangeexpand(nodes_range)
            nodes_list = [nodes.split("[")[0] + str(node_num) for node_num in nodes_list_int]
        else:
            nodes_list = [nodes]

        return nodes_list

    def get_job_nodes_list_bash(self) -> str:
        return "$SLURM_JOB_NODELIST"

    def get_node_ip_bash(self, node: str) -> str:
        return f"$(srun --nodes=1 --ntasks=1 -w \"{node}\" hostname --ip-address)"

    def get_job_id(self) -> str:
        return os.environ["SLURM_JOB_ID"]

    def get_job_id_bash(self) -> str:
        return "$SLURM_JOB_ID"

    def check_job_running(self, job_id) -> bool:
        # check if this job is still running by executing "squeue -o '$i' --job job_id" and see if error
        # is thrown (invalid job id -> job is not running) or the job_id is returned
        try:
            subprocess.check_output(["squeue", "-o", "'%i'", "--job", job_id], stderr=subprocess.DEVNULL).decode("utf-8")
            job_running = True
        except subprocess.CalledProcessError:
            job_running = False
        return job_running

    def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) -> dict[str, int]:
        # get the necessary information
        job_id = os.environ["SLURM_JOB_ID"]
        p = subprocess.check_output(["scontrol", "show", "job", "-d", job_id]).decode("utf-8")

        resources = {}
        for line in p.split("\n"):
            # get list of cpus per node
            if "Nodes=" in line and not "NumNodes" in line:
                # first get ranges for nodes and cpus
                nodes, cpus, _, _ = line.split()
                nodes = nodes.split("=")[1]
                cpus = cpus.split("=")[1]

                # expand the possible list of nodes that may be present here, e. g. taurusi[7041-7042,7045-7046]
                if "[" in nodes:  # assuming sensible node names
                    nodes_range = nodes.split("[")[1].split("]")[0]
                    nodes_list_int = self._rangeexpand(nodes_range)
                    nodes_list = [nodes.split("[")[0] + str(node_num) for node_num in nodes_list_int]
                else:
                    nodes_list = [nodes]

                # now expand the range of cpus
                cpus_list = self._rangeexpand(cpus)

                for node in nodes_list:
                    resources[node] = cpus_list

        # convert list of cpus to number of cpus in each node
        num_cpus_per_node = {}
        for node, cpus_list in resources.items():
            num_cpus_per_node[node] = int(round(len(cpus_list) / (self.threads_per_cpu_core if not include_SMT else 1)))

        return num_cpus_per_node

    def _write_job_config(self, job: "SLURMClusterJob") -> str:
        runtime_hours = int(job.runtime)
        runtime_minutes = int(60 * (job.runtime - runtime_hours))
        runtime_seconds = int(3600 * (job.runtime - runtime_hours - 1 / 60 * runtime_minutes))

        config = f"#SBATCH --job-name={job.name}\n"
        config += f"#SBATCH --output={job.log_path_out}\n"
        config += f"#SBATCH --error={job.log_path_error}\n"
        config += f"#SBATCH --account={job.account}\n" if job.account is not None else ""
        config += f"#SBATCH --partition={job.partition}\n" if job.partition is not None else ""
        config += f"#SBATCH --time={f'{runtime_hours:02d}:{runtime_minutes:02d}:{runtime_seconds:02d}'}\n"
        config += f"#SBATCH --nodes={job.nodes}\n" if job.nodes is not None else ""
        config += f"#SBATCH --ntasks={job.tasks}\n" if job.tasks is not None else ""
        config += f"#SBATCH --tasks-per-node={job.tasks_per_node}\n" if job.tasks_per_node is not None else ""
        config += f"#SBATCH --cpus-per-task={job.cpus_per_task * (self.threads_per_cpu_core if (self.SMT_included_in_reservation and job.use_SMT) else 1)}\n" if job.cpus_per_task is not None else ""
        if job.gpus_per_node is not None:
            config += f"#SBATCH --gres=gpu:{job.gpus_per_node}\n" if (job.gpus_per_node > 0) else ""
        config += f"#SBATCH --mem-per-cpu={int(job.mem_per_cpu)}\n" if job.mem_per_cpu is not None else ""
        config += f"#SBATCH --exclude={','.join(job.excludes_list)}\n" if job.excludes_list not in [None, []] else ""
        config += "#SBATCH --hint=nomultithread\n" if not job.use_SMT else ""
        return config

    def _submit(self, job_file_path: str) -> None:
        job_proc = ["sbatch", job_file_path]
        print("SubmitString: " + " ".join(job_proc))
        jid = subprocess.check_output(job_proc, stderr=subprocess.STDOUT).decode("utf-8")
        print("Return: {}".format(jid))

    def _execute_single_cmd(self, cmd: str, cpus: Optional[int] = None, node: Optional[str] = None, overcommit: Optional[bool] = False) -> str:
        return f"srun --nodes=1 --ntasks=1 {f'--cpus-per-task {cpus} ' if cpus is not None else ''}" \
               f"{f'-w {node} ' if node is not None else ''}{'--overcommit ' if overcommit else ''}{cmd}"

    def _rangeexpand(self, txt):
        """Helper function to expand ranges like "1-3,4,5,7-9,10" to a list of integers containing all values."""
        lst = []
        for r in txt.split(','):
            if '-' in r[1:]:
                r0, r1 = r[1:].split('-', 1)
                lst += range(int(r[0] + r0), int(r1) + 1)
            else:
                lst.append(int(r))
        return lst


class SLURMClusterJob(ClusterJob):
    def __init__(
            self,
            *args,
            partition: Optional[str] = None,
            account: Optional[str] = None,
            excludes_list: Optional[str] = None,
            **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.partition = partition
        self.account = account
        self.excludes_list = excludes_list

    @property
    def partition(self):
        return self._partition

    @partition.setter
    def partition(self, value):
        self._partition = value

    @property
    def account(self):
        return self._account

    @account.setter
    def account(self, value):
        self._account = value

    @property
    def excludes_list(self):
        return self._excludes_list

    @excludes_list.setter
    def excludes_list(self, value):
        self._excludes_list = value

Classes

class Cluster

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class Cluster(ABC):
    type = None

    cpus_per_node = None
    gpus_per_node = None
    mem_per_cpu = None
    threads_per_cpu_core = None  # number of threads per cpu core (SMT)
    SMT_included_in_reservation = None  # if requesting N cores, do we get N logical (SMT_included_in_reservation=True) or N physical cores (SMT_included_in_reservation=False)

    # str; path to pickle file containing a list of type [[job_id, head_node_name, instance_num], [...]] marking which
    # node already runs a Ray head node and which instance_num is used for it; if it does not exist, it will be created
    # by helpers/manage_ray_nodes.py
    # Importantly, this file needs to be writable from the worker nodes!
    ray_headnodes_path = None
    ray_temp_path = "/tmp/ray"  # path where Ray's temporary session files can be saved to; this should preferably be a local directory on each worker node.

    def submit_job(self, job: "ClusterJob", job_file_path: str, dry_run: bool = False) -> None:
        """Writes the job file for the provided job and submits it to the batch system."""
        if os.path.dirname(job_file_path) != "" and not os.path.exists(os.path.dirname(job_file_path)):
            os.makedirs(os.path.dirname(job_file_path))
        with open(job_file_path, "w") as f:
            f.write(self._write_job_config(job) + "\n")
            f.write(job.payload)

        if not dry_run:
            self._submit(job_file_path)

    @abstractmethod
    def get_job_nodes_list(self) -> list:
        """Returns a list of nodes for the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_job_nodes_list_bash(self) -> str:
        """Returns a string containing a bash command to fetch the list of node names for the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_node_ip_bash(self, node: str) -> str:
        """Returns a string containing a bash command to fetch the ip address of the provided node."""
        raise NotImplementedError

    @abstractmethod
    def get_job_id(self) -> str:
        """Returns a string containing the id of the current job."""
        raise NotImplementedError

    @abstractmethod
    def get_job_id_bash(self) -> str:
        """Returns a string containing a bash command to fetch the id of the current job."""
        raise NotImplementedError

    @abstractmethod
    def check_job_running(self, job_id) -> bool:
        """Checks if a job for a given job_id is still running."""
        raise NotImplementedError

    @abstractmethod
    def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) -> dict[str, int]:
        """Returns a dict of type {'node_name': num_cpus, ...} containing the number of cpus reserved on each node in the job."""
        raise NotImplementedError

    @abstractmethod
    def get_ports(self) -> dict[str, tuple[int, int]]:
        """
        Returns a dict of port numbers that can be used for the communication between the Ray nodes.

        See https://docs.ray.io/en/releases-2.5.1/ray-core/configure.html#ports-configurations for explanations of the
        available ports.

        The keys must be:

        - `'port'`
        - `'node_manager_port'`
        - object_manager_port
        - ray_client_server_port
        - redis_shard_ports
        - min_worker_port
        - max_worker_port

        In order to allow multiple parallel Ray clusters, the corresponding ports must be different for each job. Thus,
        instead of only a single port number, each value must be a tuple of two integers. The first value is the port
        to be used for the first job, and the second number if the offset that is added for each additional job. For example,
        a value of `(6700, 10)` would mean that the first job uses port 6700, the second job port 6710, etc.
        """
        raise NotImplementedError

    def start_ray_node(
            self,
            node: str,
            head_ip: str,
            num_cpus: Union[int, str],
            num_gpus: Union[int, str],
            head: bool = False,
            port: Optional[Union[int, str]] = None,
            node_manager_port: Optional[Union[int, str]] = None,
            object_manager_port: Optional[Union[int, str]] = None,
            ray_client_server_port: Optional[Union[int, str]] = None,
            redis_shard_ports: Optional[Union[int, str]] = None,
            min_worker_port: Optional[Union[int, str]] = None,
            max_worker_port: Optional[Union[int, str]] = None,
    ) -> str:
        """Returns a string containing a bash command to start a ray node on the provided node."""
        if head:
            cmd = f" \\\n\tray start --head \\\n"
            cmd += f"\t\t--node-ip-address=\"{head_ip}\" \\\n"
            cmd += f"\t\t--port={port} \\\n"
            cmd += f"\t\t--node-manager-port={node_manager_port} \\\n"
            cmd += f"\t\t--object-manager-port={object_manager_port} \\\n"
            cmd += f"\t\t--ray-client-server-port={ray_client_server_port} \\\n"
            cmd += f"\t\t--redis-shard-ports={redis_shard_ports} \\\n"
            cmd += f"\t\t--min-worker-port={min_worker_port} \\\n"
            cmd += f"\t\t--max-worker-port={max_worker_port} \\\n"
            cmd += f"\t\t--num-cpus {num_cpus} \\\n"
            cmd += f"\t\t--num-gpus {num_gpus} \\\n"
            cmd += f"\t\t--temp-dir {self.ray_temp_path} \\\n"
            cmd += f"\t\t--include-dashboard false \\\n"
            cmd += f"\t\t--block --disable-usage-stats &"
        else:
            cmd = f" \\\n\tray start \\\n"
            cmd += f"\t\t--address {head_ip} \\\n"
            cmd += f"\t\t--num-cpus {num_cpus} \\\n"
            cmd += f"\t\t--num-gpus {num_gpus} \\\n"
            cmd += f"\t\t--block --disable-usage-stats &"

        cmd = self._execute_single_cmd(cmd, num_cpus, node)
        return cmd

    @abstractmethod
    def _write_job_config(self, job: "ClusterJob") -> str:
        """Helper function that returns the header of the job file defining the job config, i.e. number of cpus etc."""
        raise NotImplementedError

    @abstractmethod
    def _submit(self, job_file_path: str) -> None:
        """Helper function that submits a job to the batch system."""
        raise NotImplementedError

    @abstractmethod
    def _execute_single_cmd(self, cmd: str, cpus: Optional[int] = None, node: Optional[str] = None) -> str:
        """Helper function that returns a bash command that executes a single command on a specific node of the cluster."""
        raise NotImplementedError

Ancestors

  • abc.ABC

Subclasses

Class variables

var SMT_included_in_reservation
var cpus_per_node
var gpus_per_node
var mem_per_cpu
var ray_headnodes_path
var ray_temp_path
var threads_per_cpu_core
var type

Methods

def check_job_running(self, job_id) ‑> bool

Checks if a job for a given job_id is still running.

Expand source code
@abstractmethod
def check_job_running(self, job_id) -> bool:
    """Checks if a job for a given job_id is still running."""
    raise NotImplementedError
def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) ‑> dict[str, int]

Returns a dict of type {'node_name': num_cpus, …} containing the number of cpus reserved on each node in the job.

Expand source code
@abstractmethod
def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) -> dict[str, int]:
    """Returns a dict of type {'node_name': num_cpus, ...} containing the number of cpus reserved on each node in the job."""
    raise NotImplementedError
def get_job_id(self) ‑> str

Returns a string containing the id of the current job.

Expand source code
@abstractmethod
def get_job_id(self) -> str:
    """Returns a string containing the id of the current job."""
    raise NotImplementedError
def get_job_id_bash(self) ‑> str

Returns a string containing a bash command to fetch the id of the current job.

Expand source code
@abstractmethod
def get_job_id_bash(self) -> str:
    """Returns a string containing a bash command to fetch the id of the current job."""
    raise NotImplementedError
def get_job_nodes_list(self) ‑> list

Returns a list of nodes for the current job.

Expand source code
@abstractmethod
def get_job_nodes_list(self) -> list:
    """Returns a list of nodes for the current job."""
    raise NotImplementedError
def get_job_nodes_list_bash(self) ‑> str

Returns a string containing a bash command to fetch the list of node names for the current job.

Expand source code
@abstractmethod
def get_job_nodes_list_bash(self) -> str:
    """Returns a string containing a bash command to fetch the list of node names for the current job."""
    raise NotImplementedError
def get_node_ip_bash(self, node: str) ‑> str

Returns a string containing a bash command to fetch the ip address of the provided node.

Expand source code
@abstractmethod
def get_node_ip_bash(self, node: str) -> str:
    """Returns a string containing a bash command to fetch the ip address of the provided node."""
    raise NotImplementedError
def get_ports(self) ‑> dict[str, tuple[int, int]]

Returns a dict of port numbers that can be used for the communication between the Ray nodes.

See https://docs.ray.io/en/releases-2.5.1/ray-core/configure.html#ports-configurations for explanations of the available ports.

The keys must be:

  • 'port'
  • 'node_manager_port'
  • object_manager_port
  • ray_client_server_port
  • redis_shard_ports
  • min_worker_port
  • max_worker_port

In order to allow multiple parallel Ray clusters, the corresponding ports must be different for each job. Thus, instead of only a single port number, each value must be a tuple of two integers. The first value is the port to be used for the first job, and the second number if the offset that is added for each additional job. For example, a value of (6700, 10) would mean that the first job uses port 6700, the second job port 6710, etc.

Expand source code
@abstractmethod
def get_ports(self) -> dict[str, tuple[int, int]]:
    """
    Returns a dict of port numbers that can be used for the communication between the Ray nodes.

    See https://docs.ray.io/en/releases-2.5.1/ray-core/configure.html#ports-configurations for explanations of the
    available ports.

    The keys must be:

    - `'port'`
    - `'node_manager_port'`
    - object_manager_port
    - ray_client_server_port
    - redis_shard_ports
    - min_worker_port
    - max_worker_port

    In order to allow multiple parallel Ray clusters, the corresponding ports must be different for each job. Thus,
    instead of only a single port number, each value must be a tuple of two integers. The first value is the port
    to be used for the first job, and the second number if the offset that is added for each additional job. For example,
    a value of `(6700, 10)` would mean that the first job uses port 6700, the second job port 6710, etc.
    """
    raise NotImplementedError
def start_ray_node(self, node: str, head_ip: str, num_cpus: Union[int, str], num_gpus: Union[int, str], head: bool = False, port: Union[int, str, ForwardRef(None)] = None, node_manager_port: Union[int, str, ForwardRef(None)] = None, object_manager_port: Union[int, str, ForwardRef(None)] = None, ray_client_server_port: Union[int, str, ForwardRef(None)] = None, redis_shard_ports: Union[int, str, ForwardRef(None)] = None, min_worker_port: Union[int, str, ForwardRef(None)] = None, max_worker_port: Union[int, str, ForwardRef(None)] = None) ‑> str

Returns a string containing a bash command to start a ray node on the provided node.

Expand source code
def start_ray_node(
        self,
        node: str,
        head_ip: str,
        num_cpus: Union[int, str],
        num_gpus: Union[int, str],
        head: bool = False,
        port: Optional[Union[int, str]] = None,
        node_manager_port: Optional[Union[int, str]] = None,
        object_manager_port: Optional[Union[int, str]] = None,
        ray_client_server_port: Optional[Union[int, str]] = None,
        redis_shard_ports: Optional[Union[int, str]] = None,
        min_worker_port: Optional[Union[int, str]] = None,
        max_worker_port: Optional[Union[int, str]] = None,
) -> str:
    """Returns a string containing a bash command to start a ray node on the provided node."""
    if head:
        cmd = f" \\\n\tray start --head \\\n"
        cmd += f"\t\t--node-ip-address=\"{head_ip}\" \\\n"
        cmd += f"\t\t--port={port} \\\n"
        cmd += f"\t\t--node-manager-port={node_manager_port} \\\n"
        cmd += f"\t\t--object-manager-port={object_manager_port} \\\n"
        cmd += f"\t\t--ray-client-server-port={ray_client_server_port} \\\n"
        cmd += f"\t\t--redis-shard-ports={redis_shard_ports} \\\n"
        cmd += f"\t\t--min-worker-port={min_worker_port} \\\n"
        cmd += f"\t\t--max-worker-port={max_worker_port} \\\n"
        cmd += f"\t\t--num-cpus {num_cpus} \\\n"
        cmd += f"\t\t--num-gpus {num_gpus} \\\n"
        cmd += f"\t\t--temp-dir {self.ray_temp_path} \\\n"
        cmd += f"\t\t--include-dashboard false \\\n"
        cmd += f"\t\t--block --disable-usage-stats &"
    else:
        cmd = f" \\\n\tray start \\\n"
        cmd += f"\t\t--address {head_ip} \\\n"
        cmd += f"\t\t--num-cpus {num_cpus} \\\n"
        cmd += f"\t\t--num-gpus {num_gpus} \\\n"
        cmd += f"\t\t--block --disable-usage-stats &"

    cmd = self._execute_single_cmd(cmd, num_cpus, node)
    return cmd
def submit_job(self, job: ClusterJob, job_file_path: str, dry_run: bool = False) ‑> None

Writes the job file for the provided job and submits it to the batch system.

Expand source code
def submit_job(self, job: "ClusterJob", job_file_path: str, dry_run: bool = False) -> None:
    """Writes the job file for the provided job and submits it to the batch system."""
    if os.path.dirname(job_file_path) != "" and not os.path.exists(os.path.dirname(job_file_path)):
        os.makedirs(os.path.dirname(job_file_path))
    with open(job_file_path, "w") as f:
        f.write(self._write_job_config(job) + "\n")
        f.write(job.payload)

    if not dry_run:
        self._submit(job_file_path)
class ClusterJob (name: Optional[str] = None, cpus: Optional[int] = None, gpus: Optional[int] = None, nodes: Optional[int] = None, tasks: Optional[int] = None, tasks_per_node: Optional[int] = None, gpus_per_node: Optional[int] = None, cpus_per_task: Optional[int] = None, mem_per_cpu: Union[int, float, ForwardRef(None)] = None, use_SMT: Optional[bool] = None, runtime: Union[int, float, ForwardRef(None)] = None, log_path_out: Optional[str] = None, log_path_error: Optional[str] = None, payload: Optional[str] = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class ClusterJob(ABC):
    def __init__(
            self,
            name: Optional[str] = None,
            cpus: Optional[int] = None,
            gpus: Optional[int] = None,
            nodes: Optional[int] = None,
            tasks: Optional[int] = None,
            tasks_per_node: Optional[int] = None,
            gpus_per_node: Optional[int] = None,
            cpus_per_task: Optional[int] = None,
            mem_per_cpu: Optional[Union[int, float]] = None,
            use_SMT: Optional[bool] = None,
            runtime: Optional[Union[int, float]] = None,
            log_path_out: Optional[str] = None,
            log_path_error: Optional[str] = None,
            payload: Optional[str] = None,
    ):
        self.name = name
        self.cpus = cpus
        self.gpus = gpus
        self.nodes = nodes
        self.tasks = tasks
        self.tasks_per_node = tasks_per_node
        self.gpus_per_node = gpus_per_node
        self.cpus_per_task = cpus_per_task
        self.mem_per_cpu = mem_per_cpu
        self.use_SMT = use_SMT
        self.runtime = runtime
        self.log_path_out = log_path_out
        self.log_path_error = log_path_error
        self.payload = payload

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        self._name = value

    @property
    def cpus(self):
        return self._cpus

    @cpus.setter
    def cpus(self, value):
        self._cpus = value

    @property
    def gpus(self):
        return self._gpus

    @gpus.setter
    def gpus(self, value):
        self._gpus = value

    @property
    def nodes(self):
        return self._nodes

    @nodes.setter
    def nodes(self, value):
        self._nodes = value

    @property
    def tasks(self):
        return self._tasks

    @tasks.setter
    def tasks(self, value):
        self._tasks = value

    @property
    def tasks_per_node(self):
        return self._tasks_per_node

    @tasks_per_node.setter
    def tasks_per_node(self, value):
        self._tasks_per_node = value

    @property
    def gpus_per_node(self):
        return self._gpus_per_node

    @gpus_per_node.setter
    def gpus_per_node(self, value):
        self._gpus_per_node = value

    @property
    def cpus_per_task(self):
        return self._cpus_per_task

    @cpus_per_task.setter
    def cpus_per_task(self, value):
        self._cpus_per_task = value

    @property
    def mem_per_cpu(self):
        return self._mem_per_cpu

    @mem_per_cpu.setter
    def mem_per_cpu(self, value):
        self._mem_per_cpu = value

    @property
    def use_SMT(self):
        return self._use_SMT

    @use_SMT.setter
    def use_SMT(self, value):
        self._use_SMT = value

    @property
    def runtime(self):
        return self._runtime

    @runtime.setter
    def runtime(self, value):
        self._runtime = value

    @property
    def log_path_out(self):
        return self._log_path_out

    @log_path_out.setter
    def log_path_out(self, value):
        self._log_path_out = value

    @property
    def log_path_error(self):
        return self._log_path_error

    @log_path_error.setter
    def log_path_error(self, value):
        self._log_path_error = value

    @property
    def payload(self):
        return self._payload

    @payload.setter
    def payload(self, value):
        self._payload = value

Ancestors

  • abc.ABC

Subclasses

Instance variables

var cpus
Expand source code
@property
def cpus(self):
    return self._cpus
var cpus_per_task
Expand source code
@property
def cpus_per_task(self):
    return self._cpus_per_task
var gpus
Expand source code
@property
def gpus(self):
    return self._gpus
var gpus_per_node
Expand source code
@property
def gpus_per_node(self):
    return self._gpus_per_node
var log_path_error
Expand source code
@property
def log_path_error(self):
    return self._log_path_error
var log_path_out
Expand source code
@property
def log_path_out(self):
    return self._log_path_out
var mem_per_cpu
Expand source code
@property
def mem_per_cpu(self):
    return self._mem_per_cpu
var name
Expand source code
@property
def name(self):
    return self._name
var nodes
Expand source code
@property
def nodes(self):
    return self._nodes
var payload
Expand source code
@property
def payload(self):
    return self._payload
var runtime
Expand source code
@property
def runtime(self):
    return self._runtime
var tasks
Expand source code
@property
def tasks(self):
    return self._tasks
var tasks_per_node
Expand source code
@property
def tasks_per_node(self):
    return self._tasks_per_node
var use_SMT
Expand source code
@property
def use_SMT(self):
    return self._use_SMT
class SLURMCluster

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SLURMCluster(Cluster):
    type = "SLURM"

    def get_job_nodes_list(self) -> list:
        # get the list of nodes
        nodes = os.environ["SLURM_JOB_NODELIST"]

        # expand the possible list of nodes that may be present here, e. g. taurusi[7041-7042,7045-7046]
        if "[" in nodes:  # assuming sensible node names
            nodes_range = nodes.split("[")[1].split("]")[0]
            nodes_list_int = self._rangeexpand(nodes_range)
            nodes_list = [nodes.split("[")[0] + str(node_num) for node_num in nodes_list_int]
        else:
            nodes_list = [nodes]

        return nodes_list

    def get_job_nodes_list_bash(self) -> str:
        return "$SLURM_JOB_NODELIST"

    def get_node_ip_bash(self, node: str) -> str:
        return f"$(srun --nodes=1 --ntasks=1 -w \"{node}\" hostname --ip-address)"

    def get_job_id(self) -> str:
        return os.environ["SLURM_JOB_ID"]

    def get_job_id_bash(self) -> str:
        return "$SLURM_JOB_ID"

    def check_job_running(self, job_id) -> bool:
        # check if this job is still running by executing "squeue -o '$i' --job job_id" and see if error
        # is thrown (invalid job id -> job is not running) or the job_id is returned
        try:
            subprocess.check_output(["squeue", "-o", "'%i'", "--job", job_id], stderr=subprocess.DEVNULL).decode("utf-8")
            job_running = True
        except subprocess.CalledProcessError:
            job_running = False
        return job_running

    def get_cpus_per_nodes(self, include_SMT: Optional[bool] = False) -> dict[str, int]:
        # get the necessary information
        job_id = os.environ["SLURM_JOB_ID"]
        p = subprocess.check_output(["scontrol", "show", "job", "-d", job_id]).decode("utf-8")

        resources = {}
        for line in p.split("\n"):
            # get list of cpus per node
            if "Nodes=" in line and not "NumNodes" in line:
                # first get ranges for nodes and cpus
                nodes, cpus, _, _ = line.split()
                nodes = nodes.split("=")[1]
                cpus = cpus.split("=")[1]

                # expand the possible list of nodes that may be present here, e. g. taurusi[7041-7042,7045-7046]
                if "[" in nodes:  # assuming sensible node names
                    nodes_range = nodes.split("[")[1].split("]")[0]
                    nodes_list_int = self._rangeexpand(nodes_range)
                    nodes_list = [nodes.split("[")[0] + str(node_num) for node_num in nodes_list_int]
                else:
                    nodes_list = [nodes]

                # now expand the range of cpus
                cpus_list = self._rangeexpand(cpus)

                for node in nodes_list:
                    resources[node] = cpus_list

        # convert list of cpus to number of cpus in each node
        num_cpus_per_node = {}
        for node, cpus_list in resources.items():
            num_cpus_per_node[node] = int(round(len(cpus_list) / (self.threads_per_cpu_core if not include_SMT else 1)))

        return num_cpus_per_node

    def _write_job_config(self, job: "SLURMClusterJob") -> str:
        runtime_hours = int(job.runtime)
        runtime_minutes = int(60 * (job.runtime - runtime_hours))
        runtime_seconds = int(3600 * (job.runtime - runtime_hours - 1 / 60 * runtime_minutes))

        config = f"#SBATCH --job-name={job.name}\n"
        config += f"#SBATCH --output={job.log_path_out}\n"
        config += f"#SBATCH --error={job.log_path_error}\n"
        config += f"#SBATCH --account={job.account}\n" if job.account is not None else ""
        config += f"#SBATCH --partition={job.partition}\n" if job.partition is not None else ""
        config += f"#SBATCH --time={f'{runtime_hours:02d}:{runtime_minutes:02d}:{runtime_seconds:02d}'}\n"
        config += f"#SBATCH --nodes={job.nodes}\n" if job.nodes is not None else ""
        config += f"#SBATCH --ntasks={job.tasks}\n" if job.tasks is not None else ""
        config += f"#SBATCH --tasks-per-node={job.tasks_per_node}\n" if job.tasks_per_node is not None else ""
        config += f"#SBATCH --cpus-per-task={job.cpus_per_task * (self.threads_per_cpu_core if (self.SMT_included_in_reservation and job.use_SMT) else 1)}\n" if job.cpus_per_task is not None else ""
        if job.gpus_per_node is not None:
            config += f"#SBATCH --gres=gpu:{job.gpus_per_node}\n" if (job.gpus_per_node > 0) else ""
        config += f"#SBATCH --mem-per-cpu={int(job.mem_per_cpu)}\n" if job.mem_per_cpu is not None else ""
        config += f"#SBATCH --exclude={','.join(job.excludes_list)}\n" if job.excludes_list not in [None, []] else ""
        config += "#SBATCH --hint=nomultithread\n" if not job.use_SMT else ""
        return config

    def _submit(self, job_file_path: str) -> None:
        job_proc = ["sbatch", job_file_path]
        print("SubmitString: " + " ".join(job_proc))
        jid = subprocess.check_output(job_proc, stderr=subprocess.STDOUT).decode("utf-8")
        print("Return: {}".format(jid))

    def _execute_single_cmd(self, cmd: str, cpus: Optional[int] = None, node: Optional[str] = None, overcommit: Optional[bool] = False) -> str:
        return f"srun --nodes=1 --ntasks=1 {f'--cpus-per-task {cpus} ' if cpus is not None else ''}" \
               f"{f'-w {node} ' if node is not None else ''}{'--overcommit ' if overcommit else ''}{cmd}"

    def _rangeexpand(self, txt):
        """Helper function to expand ranges like "1-3,4,5,7-9,10" to a list of integers containing all values."""
        lst = []
        for r in txt.split(','):
            if '-' in r[1:]:
                r0, r1 = r[1:].split('-', 1)
                lst += range(int(r[0] + r0), int(r1) + 1)
            else:
                lst.append(int(r))
        return lst

Ancestors

Subclasses

Class variables

var type

Inherited members

class SLURMClusterJob (*args, partition: Optional[str] = None, account: Optional[str] = None, excludes_list: Optional[str] = None, **kwargs)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SLURMClusterJob(ClusterJob):
    def __init__(
            self,
            *args,
            partition: Optional[str] = None,
            account: Optional[str] = None,
            excludes_list: Optional[str] = None,
            **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.partition = partition
        self.account = account
        self.excludes_list = excludes_list

    @property
    def partition(self):
        return self._partition

    @partition.setter
    def partition(self, value):
        self._partition = value

    @property
    def account(self):
        return self._account

    @account.setter
    def account(self, value):
        self._account = value

    @property
    def excludes_list(self):
        return self._excludes_list

    @excludes_list.setter
    def excludes_list(self, value):
        self._excludes_list = value

Ancestors

Instance variables

var account
Expand source code
@property
def account(self):
    return self._account
var excludes_list
Expand source code
@property
def excludes_list(self):
    return self._excludes_list
var partition
Expand source code
@property
def partition(self):
    return self._partition