Module OPTIMA.helpers.manage_ray_nodes
Helper module to manage the ray nodes on a cluster.
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Helper module to manage the ray nodes on a cluster."""
import os
import pickle
import argparse
import sys
import time
import logging
from OPTIMA.hardware_configs.helpers import get_cluster
def get_and_verify_running_head_nodes(cluster, update_ray_nodes_file=True):
"""_summary_.
Parameters
----------
cluster : _type_
_description_
update_ray_nodes_file : _type_
_description_ (Default value = True)
Returns
-------
_type_
_description_
"""
# get the list of machines that is currently running ray head nodes
if not os.path.exists(cluster.ray_headnodes_path):
with open(cluster.ray_headnodes_path, "wb") as file:
pickle.dump([], file)
os.chmod(cluster.ray_headnodes_path, 0o664)
return []
else:
with open(cluster.ray_headnodes_path, "rb") as file:
running_head_nodes = pickle.load(file)
# get all head nodes; check if the corresponding job is still running
verified_running_head_nodes = []
logging.info("Verifying running ray head nodes...")
for job_id, head_node, instance_num in running_head_nodes:
logging.info(f"Checking if job {job_id} is still running...")
job_running = cluster.check_job_running(job_id)
# update the list for all jobs that are still running
if job_running:
logging.info(f"Job {job_id} is still running.")
verified_running_head_nodes.append([job_id, head_node, instance_num])
else:
logging.info(
f"Job {job_id} has been terminated." + " Removing the entry from the ray head nodes file."
if update_ray_nodes_file
else ""
)
if update_ray_nodes_file:
with open(cluster.ray_headnodes_path, "wb") as file:
pickle.dump(verified_running_head_nodes, file)
return verified_running_head_nodes
def sort_nodes(running_head_nodes, cluster):
"""Return a sorted list of nodes for this job, starting with free nodes (i.e. no ray head nodes), followed by occupied nodes.
This function will fetch all nodes for this job, check on which of them a ray head node is already running and return a sorted list
+ a boolean if any of the available nodes is still free
Parameters
----------
running_head_nodes : _type_
_description_
cluster : _type_
_description_
"""
nodes_list = cluster.get_job_nodes_list()
# get the list of running head nodes
running_head_nodes_list = []
for _, head_node, _ in running_head_nodes:
running_head_nodes_list.append(head_node)
# go through all nodes of this job and check if they are in running_head_nodes_list
available_nodes = []
occupied_nodes = []
for node in nodes_list:
if node in running_head_nodes_list:
occupied_nodes.append(node)
else:
available_nodes.append(node)
return available_nodes + occupied_nodes, len(available_nodes) > 0
def main():
"""_summary_.
Returns
-------
_type_
_description_
"""
# TODO: add config!
parser = argparse.ArgumentParser(
description="Helper script that contains useful function to manage ray nodes",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--cluster",
help="Specify which cluster the job should be executed on. This must be one of the "
"possible values given in get_cluster() in hardware_config.common.",
)
parser.add_argument(
"--sorted_nodes_path",
default="temp_sorted_nodes.txt",
help="Path to the file where the sorted list of nodes for this job should be saved.",
)
parser.add_argument(
"--sorted_cpus_per_node_path",
default="temp_sorted_cpus_per_node.txt",
help="Path to the file where the sorted list of cpus per node for this job should be saved.",
)
parser.add_argument(
"--instance_num_path",
default="temp_instance_num.txt",
help="Path to the file where the instance_num that is to be used for this job should be saved. This is needed "
"to ensure different Ray clusters use different ports for the communication.",
)
args = parser.parse_args(sys.argv[1:])
# logging config
DFormat = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(format=DFormat, level=logging.INFO)
# get the cluster for this job
cluster = get_cluster(args.cluster)
# in order to prevent multiple jobs from accessing running_headnodes.pickle at the same time, create a queue to
# schedule the accesses based on the job id
# wrap in try...finally block to make sure the queue file is deleted in the end
try:
job_id = cluster.get_job_id()
node_folder = os.path.dirname(cluster.ray_headnodes_path)
queue_file_path = os.path.join(
node_folder, job_id
) # temporary file to tell other jobs that this job wants to access the running_headnodes.pickle file
open(queue_file_path, "w").close() # create temporary file
# get all files in the working dir, excluding the running_headnodes.pickle file --> should only be temporary files created
# by waiting jobs, file names are the corresponding job ids --> sort the ids and only continue, if this jobs id is the lowest
# one; if not, check again in a few seconds
can_start = False
while not can_start:
# get all files and remove the .pickle file (if present)
file_list = os.listdir(node_folder)
if os.path.basename(cluster.ray_headnodes_path) in file_list:
file_list.remove(os.path.basename(cluster.ray_headnodes_path))
# convert to int and sort
file_list = [int(file) for file in file_list]
file_list.sort()
# check if jobs associated to queue files are still running (files may remain if job crashes or is canceled here)
for other_job_id in file_list:
if not cluster.check_job_running(str(other_job_id)):
logging.info(
f"Job {other_job_id} has been terminated, but entry is still present in queue. Removing..."
)
try:
file_list.remove(other_job_id)
os.remove(os.path.join(node_folder, str(other_job_id)))
except FileNotFoundError:
pass
# start if this job id is the first entry in the sorted list
queue_spot = file_list.index(int(job_id)) + 1
if queue_spot == 1:
logging.info("Number 1 in queue, starting...")
can_start = True
else:
logging.info(f"Number {queue_spot} in queue, waiting...")
time.sleep(15)
running_head_nodes = get_and_verify_running_head_nodes(cluster)
sorted_nodes, node_available = sort_nodes(running_head_nodes, cluster)
if node_available:
# get the smallest instance num that is still free
instance_nums_taken = [running_head_node[2] for running_head_node in running_head_nodes]
next_free_instance_num = next(i for i, e in enumerate(sorted(instance_nums_taken) + [None], 1) if i != e)
logging.info(f"Using instance num {next_free_instance_num}.")
# update list of running head nodes
running_head_nodes.append([str(job_id), sorted_nodes[0], next_free_instance_num])
with open(cluster.ray_headnodes_path, "wb") as file:
pickle.dump(running_head_nodes, file)
# save sorted nodes list to file
with open(args.sorted_nodes_path, "w") as file:
file.write(" ".join(sorted_nodes))
# also dump the number of cpus per node (in the same order as sorted_nodes)
num_cpus_per_node = cluster.get_cpus_per_nodes()
sorted_num_cpus = [str(num_cpus_per_node[node]) for node in sorted_nodes]
with open(args.sorted_cpus_per_node_path, "w") as file:
file.write(" ".join(sorted_num_cpus))
# finally save the instance_num that is to be used
with open(args.instance_num_path, "w") as file:
file.write(str(next_free_instance_num))
else:
# exit code 129 to tell optimization script that no free node was found
sys.exit(129)
finally:
# remove the queue file
os.remove(queue_file_path)
if __name__ == "__main__":
main()
Functions
def get_and_verify_running_head_nodes(cluster, update_ray_nodes_file=True)-
summary.
Parameters
cluster:_type_- description
update_ray_nodes_file:_type_- description (Default value = True)
Returns
_type_- description
Expand source code
def get_and_verify_running_head_nodes(cluster, update_ray_nodes_file=True): """_summary_. Parameters ---------- cluster : _type_ _description_ update_ray_nodes_file : _type_ _description_ (Default value = True) Returns ------- _type_ _description_ """ # get the list of machines that is currently running ray head nodes if not os.path.exists(cluster.ray_headnodes_path): with open(cluster.ray_headnodes_path, "wb") as file: pickle.dump([], file) os.chmod(cluster.ray_headnodes_path, 0o664) return [] else: with open(cluster.ray_headnodes_path, "rb") as file: running_head_nodes = pickle.load(file) # get all head nodes; check if the corresponding job is still running verified_running_head_nodes = [] logging.info("Verifying running ray head nodes...") for job_id, head_node, instance_num in running_head_nodes: logging.info(f"Checking if job {job_id} is still running...") job_running = cluster.check_job_running(job_id) # update the list for all jobs that are still running if job_running: logging.info(f"Job {job_id} is still running.") verified_running_head_nodes.append([job_id, head_node, instance_num]) else: logging.info( f"Job {job_id} has been terminated." + " Removing the entry from the ray head nodes file." if update_ray_nodes_file else "" ) if update_ray_nodes_file: with open(cluster.ray_headnodes_path, "wb") as file: pickle.dump(verified_running_head_nodes, file) return verified_running_head_nodes def main()-
summary.
Returns
_type_- description
Expand source code
def main(): """_summary_. Returns ------- _type_ _description_ """ # TODO: add config! parser = argparse.ArgumentParser( description="Helper script that contains useful function to manage ray nodes", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "--cluster", help="Specify which cluster the job should be executed on. This must be one of the " "possible values given in get_cluster() in hardware_config.common.", ) parser.add_argument( "--sorted_nodes_path", default="temp_sorted_nodes.txt", help="Path to the file where the sorted list of nodes for this job should be saved.", ) parser.add_argument( "--sorted_cpus_per_node_path", default="temp_sorted_cpus_per_node.txt", help="Path to the file where the sorted list of cpus per node for this job should be saved.", ) parser.add_argument( "--instance_num_path", default="temp_instance_num.txt", help="Path to the file where the instance_num that is to be used for this job should be saved. This is needed " "to ensure different Ray clusters use different ports for the communication.", ) args = parser.parse_args(sys.argv[1:]) # logging config DFormat = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=DFormat, level=logging.INFO) # get the cluster for this job cluster = get_cluster(args.cluster) # in order to prevent multiple jobs from accessing running_headnodes.pickle at the same time, create a queue to # schedule the accesses based on the job id # wrap in try...finally block to make sure the queue file is deleted in the end try: job_id = cluster.get_job_id() node_folder = os.path.dirname(cluster.ray_headnodes_path) queue_file_path = os.path.join( node_folder, job_id ) # temporary file to tell other jobs that this job wants to access the running_headnodes.pickle file open(queue_file_path, "w").close() # create temporary file # get all files in the working dir, excluding the running_headnodes.pickle file --> should only be temporary files created # by waiting jobs, file names are the corresponding job ids --> sort the ids and only continue, if this jobs id is the lowest # one; if not, check again in a few seconds can_start = False while not can_start: # get all files and remove the .pickle file (if present) file_list = os.listdir(node_folder) if os.path.basename(cluster.ray_headnodes_path) in file_list: file_list.remove(os.path.basename(cluster.ray_headnodes_path)) # convert to int and sort file_list = [int(file) for file in file_list] file_list.sort() # check if jobs associated to queue files are still running (files may remain if job crashes or is canceled here) for other_job_id in file_list: if not cluster.check_job_running(str(other_job_id)): logging.info( f"Job {other_job_id} has been terminated, but entry is still present in queue. Removing..." ) try: file_list.remove(other_job_id) os.remove(os.path.join(node_folder, str(other_job_id))) except FileNotFoundError: pass # start if this job id is the first entry in the sorted list queue_spot = file_list.index(int(job_id)) + 1 if queue_spot == 1: logging.info("Number 1 in queue, starting...") can_start = True else: logging.info(f"Number {queue_spot} in queue, waiting...") time.sleep(15) running_head_nodes = get_and_verify_running_head_nodes(cluster) sorted_nodes, node_available = sort_nodes(running_head_nodes, cluster) if node_available: # get the smallest instance num that is still free instance_nums_taken = [running_head_node[2] for running_head_node in running_head_nodes] next_free_instance_num = next(i for i, e in enumerate(sorted(instance_nums_taken) + [None], 1) if i != e) logging.info(f"Using instance num {next_free_instance_num}.") # update list of running head nodes running_head_nodes.append([str(job_id), sorted_nodes[0], next_free_instance_num]) with open(cluster.ray_headnodes_path, "wb") as file: pickle.dump(running_head_nodes, file) # save sorted nodes list to file with open(args.sorted_nodes_path, "w") as file: file.write(" ".join(sorted_nodes)) # also dump the number of cpus per node (in the same order as sorted_nodes) num_cpus_per_node = cluster.get_cpus_per_nodes() sorted_num_cpus = [str(num_cpus_per_node[node]) for node in sorted_nodes] with open(args.sorted_cpus_per_node_path, "w") as file: file.write(" ".join(sorted_num_cpus)) # finally save the instance_num that is to be used with open(args.instance_num_path, "w") as file: file.write(str(next_free_instance_num)) else: # exit code 129 to tell optimization script that no free node was found sys.exit(129) finally: # remove the queue file os.remove(queue_file_path) def sort_nodes(running_head_nodes, cluster)-
Return a sorted list of nodes for this job, starting with free nodes (i.e. no ray head nodes), followed by occupied nodes.
This function will fetch all nodes for this job, check on which of them a ray head node is already running and return a sorted list + a boolean if any of the available nodes is still free
Parameters
running_head_nodes:_type_- description
cluster:_type_- description
Expand source code
def sort_nodes(running_head_nodes, cluster): """Return a sorted list of nodes for this job, starting with free nodes (i.e. no ray head nodes), followed by occupied nodes. This function will fetch all nodes for this job, check on which of them a ray head node is already running and return a sorted list + a boolean if any of the available nodes is still free Parameters ---------- running_head_nodes : _type_ _description_ cluster : _type_ _description_ """ nodes_list = cluster.get_job_nodes_list() # get the list of running head nodes running_head_nodes_list = [] for _, head_node, _ in running_head_nodes: running_head_nodes_list.append(head_node) # go through all nodes of this job and check if they are in running_head_nodes_list available_nodes = [] occupied_nodes = [] for node in nodes_list: if node in running_head_nodes_list: occupied_nodes.append(node) else: available_nodes.append(node) return available_nodes + occupied_nodes, len(available_nodes) > 0