Source code for dpdispatcher.machines.fugaku

import shlex

from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import customized_script_header_template

fugaku_script_header_template = """\
{queue_name_line}
{fugaku_node_number_line}
{fugaku_ntasks_per_node_line}
"""


[docs] class Fugaku(Machine):
[docs] def gen_script(self, job): fugaku_script = super().gen_script(job) return fugaku_script
[docs] def gen_script_header(self, job): resources = job.resources fugaku_script_header_dict = {} fugaku_script_header_dict["fugaku_node_number_line"] = ( f'#PJM -L "node={resources.number_node}" ' ) fugaku_script_header_dict["fugaku_ntasks_per_node_line"] = ( f'#PJM --mpi "max-proc-per-node={resources.cpu_per_node}"' ) fugaku_script_header_dict["queue_name_line"] = ( f'#PJM -L "rscgrp={resources.queue_name}"' ) if ( resources["strategy"].get("customized_script_header_template_file") is not None ): fugaku_script_header = customized_script_header_template( resources["strategy"]["customized_script_header_template_file"], resources, ) else: fugaku_script_header = fugaku_script_header_template.format( **fugaku_script_header_dict ) return fugaku_script_header
[docs] def do_submit(self, job): script_file_name = job.script_file_name script_str = self.gen_script(job) job_id_name = job.job_hash + "_job_id" # script_str = self.sub_script(job_dirs, cmd, args=args, resources=resources, outlog=outlog, errlog=errlog) self.context.write_file(fname=script_file_name, write_str=script_str) script_run_str = self.gen_script_command(job) script_run_file_name = f"{job.script_file_name}.run" self.context.write_file(fname=script_run_file_name, write_str=script_run_str) # self.context.write_file(fname=os.path.join(self.context.submission.work_base, script_file_name), write_str=script_str) # script_file_dir = os.path.join(self.context.submission.work_base) script_file_dir = self.context.remote_root # stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (self.context.remote_root, 'pjsub', script_file_name)) stdin, stdout, stderr = self.context.block_checkcall( "cd {} && {} {}".format( shlex.quote(script_file_dir), "pjsub", shlex.quote(script_file_name) ) ) subret = stdout.readlines() job_id = subret[0].split()[5] self.context.write_file(job_id_name, job_id) return job_id
[docs] def default_resources(self, resources): pass
[docs] def check_status(self, job): job_id = job.job_id if job_id == "": return JobStatus.unsubmitted ret, stdin, stdout, stderr = self.context.block_call("pjstat " + job_id) err_str = stderr.read().decode("utf-8") try: status_line = stdout.read().decode("utf-8").split("\n")[-2] # pjstat only retrun 0 if the job is not waiting or running except Exception: ret, stdin, stdout, stderr = self.context.block_call("pjstat -H " + job_id) status_line = stdout.read().decode("utf-8").split("\n")[-2] status_word = status_line.split()[3] if status_word in ["EXT", "CCL", "ERR"]: if self.check_finish_tag(job): dlog.info(f"job: {job.job_hash} {job.job_id} finished") return JobStatus.finished else: return JobStatus.terminated else: return JobStatus.unknown status_word = status_line.split()[3] # dlog.info (status_word) if status_word in ["QUE", "HLD", "RNA", "SPD"]: return JobStatus.waiting elif status_word in ["RUN", "RNE"]: return JobStatus.running else: return JobStatus.unknown
[docs] def check_finish_tag(self, job): job_tag_finished = job.job_hash + "_job_tag_finished" return self.context.check_file_exists(job_tag_finished)