dpdispatcher package

class dpdispatcher.Job(job_task_list, *, resources, machine=None)[source]

Bases: object

Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.

Parameters:
job_task_listlist of Task

the tasks belonging to the job

resourcesResources

the machine resources. Passed from Submission when it constructs jobs.

machinemachine

machine object to execute the job. Passed from Submission when it constructs jobs.

Methods

deserialize(job_dict[, machine])

Convert the job_dict to a Submission class object.

get_job_state()

Get the jobs.

get_last_error_message()

Get last error message when the job is terminated.

serialize([if_static])

Convert the Task class instance to a dictionary.

get_hash

handle_unexpected_job_state

job_to_json

register_job_id

submit_job

classmethod deserialize(job_dict, machine=None)[source]

Convert the job_dict to a Submission class object.

Parameters:
job_dictdict

the dictionary which contains the job information

machineMachine

the machine object to execute the job

Returns:
submissionJob

the Job class instance converted from the job_dict

get_hash()[source]
get_job_state()[source]

Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.

Notes

this method will not submit or resubmit the jobs if the job is unsubmitted.

get_last_error_message() str | None[source]

Get last error message when the job is terminated.

handle_unexpected_job_state()[source]
job_to_json()[source]
register_job_id(job_id)[source]
serialize(if_static=False)[source]

Convert the Task class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.

Returns:
task_dictdict

the dictionary converted from the Task class instance

submit_job()[source]
class dpdispatcher.Machine(*args, **kwargs)[source]

Bases: object

A machine is used to handle the connection with remote machines.

Parameters:
contextSubClass derived from BaseContext

The context is used to mainatin the connection with remote machine.

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

alias: Tuple[str, ...] = ()
classmethod arginfo()[source]
bind_context(context)[source]
abstract check_finish_tag(**kwargs)[source]
check_if_recover(submission)[source]
abstract check_status(job)[source]
default_resources(res)[source]
classmethod deserialize(machine_dict)[source]
abstract do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_command_env_cuda_devices(resources)[source]
gen_script(job)[source]
gen_script_command(job)[source]
gen_script_custom_flags_lines(job)[source]
gen_script_end(job)[source]
gen_script_env(job)[source]
abstract gen_script_header(job)[source]
gen_script_run_command(job)[source]
gen_script_wait(resources)[source]
get_exit_code(job)[source]

Get exit code of the job.

Parameters:
jobJob

job

kill(job)[source]

Kill the job.

If not implemented, pass and let the user manually kill it.

Parameters:
jobJob

job

classmethod load_from_dict(machine_dict)[source]
classmethod load_from_json(json_path)[source]
classmethod load_from_yaml(yaml_path)[source]
options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
classmethod resources_arginfo() Argument[source]

Generate the resources arginfo.

Returns:
Argument

resources arginfo

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

serialize(if_empty_remote_profile=False)[source]
sub_script_cmd(res)[source]
sub_script_head(res)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}
class dpdispatcher.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]

Bases: object

Resources is used to describe the machine resources we need to do calculations.

Parameters:
number_nodeint

The number of node need for each job.

cpu_per_nodeint

cpu numbers of each node.

gpu_per_nodeint

gpu numbers of each node.

queue_namestr

The queue name of batch job scheduler system.

group_sizeint

The number of tasks in a job.

custom_flagslist of Str

The extra lines pass to job submitting script header

strategydict

strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool

If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.

ratio_unfinishedfloat

The ratio of task that can be unfinished.

customized_script_header_template_filestr

The customized template file to generate job submitting script header, which overrides the default file.

para_degint

Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]

source_listlist of Path

The env file to be sourced before the command execution.

wait_timeint

The waitting time in second after a single task submitted. Default: 0.

Methods

arginfo

deserialize

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo(detail_kwargs=True)[source]
classmethod deserialize(resources_dict)[source]
classmethod load_from_dict(resources_dict)[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]
class dpdispatcher.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]

Bases: object

A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.

Parameters:
work_basePath

the base directory of the local tasks. It is usually the dir name of project .

machineMachine

machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.

resourcesResources

the machine resources (cpu or gpu) used to generate the slurm/pbs script

forward_common_fileslist

the common files to be uploaded to other computers before the jobs begin

backward_common_fileslist

the common files to be downloaded from other computers after the jobs finish

task_listlist of Task

a list of tasks to be run.

Methods

async_run_submission(**kwargs)

Async interface of run_submission.

bind_machine(machine)

Bind this submission to a machine.

check_all_finished()

Check whether all the jobs in the submission.

check_ratio_unfinished(ratio_unfinished)

Calculate the ratio of unfinished tasks in the submission.

deserialize(submission_dict[, machine])

Convert the submission_dict to a Submission class object.

generate_jobs()

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.

handle_unexpected_submission_state()

Handle unexpected job state of the submission.

run_submission(*[, dry_run, exit_on_submit, ...])

Main method to execute the submission.

serialize([if_static])

Convert the Submission class instance to a dictionary.

update_submission_state()

Check whether all the jobs in the submission.

clean_jobs

download_jobs

get_hash

register_task

register_task_list

remove_unfinished_tasks

submission_from_json

submission_to_json

try_download_result

try_recover_from_json

upload_jobs

async async_run_submission(**kwargs)[source]

Async interface of run_submission.

Examples

>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
...     backgroud_task = set()
...     # task1
...     task1 = Task(...)
...     submission1 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission1.async_run_submission(check_interval=2, clean=False)
...     )
...     # task2
...     task2 = Task(...)
...     submission2 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission2.async_run_submission(check_interval=2, clean=False)
...     )
...     background_tasks.add(background_task)
...     result = await asyncio.gather(*background_tasks)
...     return result
>>> run_jobs()

May raise Error if pass clean=True explicitly when submit to pbs or slurm.

bind_machine(machine)[source]

Bind this submission to a machine. update the machine’s context remote_root and local_root.

Parameters:
machineMachine

the machine to bind with

check_all_finished()[source]

Check whether all the jobs in the submission.

Notes

This method will not handle unexpected job state in the submission.

check_ratio_unfinished(ratio_unfinished: float) bool[source]

Calculate the ratio of unfinished tasks in the submission.

Parameters:
ratio_unfinishedfloat

the ratio of unfinished tasks in the submission

Returns:
bool

whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished

clean_jobs()[source]
classmethod deserialize(submission_dict, machine=None)[source]

Convert the submission_dict to a Submission class object.

Parameters:
submission_dictdict

path-like, the base directory of the local tasks

machineMachine

Machine class Object to execute the jobs

Returns:
submissionSubmission

the Submission class instance converted from the submission_dict

download_jobs()[source]
generate_jobs()[source]

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.

get_hash()[source]
handle_unexpected_submission_state()[source]

Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.

register_task(task)[source]
register_task_list(task_list)[source]
remove_unfinished_tasks()[source]
run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]

Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.

serialize(if_static=False)[source]

Convert the Submission class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.

Returns:
submission_dictdict

the dictionary converted from the Submission class instance

classmethod submission_from_json(json_file_name='submission.json')[source]
submission_to_json()[source]
try_download_result()[source]
try_recover_from_json()[source]
update_submission_state()[source]

Check whether all the jobs in the submission.

Notes

this method will not handle unexpected (like resubmit terminated) job state in the submission.

upload_jobs()[source]
class dpdispatcher.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]

Bases: object

A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.

Parameters:
commandStr

the command to be executed.

task_work_pathPath

the directory of each file where the files are dependent on.

forward_fileslist of Path

the files to be transmitted to remote machine before the command execute.

backward_fileslist of Path

the files to be transmitted from remote machine after the comand finished.

outlogStr

the filename to which command redirect stdout

errlogStr

the filename to which command redirect stderr

Methods

deserialize(task_dict)

Convert the task_dict to a Task class object.

get_task_state(context)

Get the task state by checking the tag file.

arginfo

get_hash

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo()[source]
classmethod deserialize(task_dict)[source]

Convert the task_dict to a Task class object.

Parameters:
task_dictdict

the dictionary which contains the task information

Returns:
taskTask

the Task class instance converted from the task_dict

get_hash()[source]
get_task_state(context)[source]

Get the task state by checking the tag file.

Parameters:
contextContext

the context of the task

classmethod load_from_dict(task_dict: dict) Task[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]

Subpackages

Submodules

dpdispatcher.arginfo module

dpdispatcher.base_context module

class dpdispatcher.base_context.BaseContext(*args, **kwargs)[source]

Bases: object

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

check_finish

clean

download

load_from_dict

read_file

upload

write_file

alias: Tuple[str, ...] = ()
bind_submission(submission)[source]
check_finish(proc)[source]
abstract clean()[source]
abstract download(submission, check_exists=False, mark_failure=True, back_error=False)[source]
classmethod load_from_dict(context_dict)[source]
classmethod machine_arginfo() Argument[source]

Generate the machine arginfo.

Returns:
Argument

machine arginfo

classmethod machine_subfields() List[Argument][source]

Generate the machine subfields.

Returns:
list[Argument]

machine subfields

options = {'BohriumContext', 'HDFSContext', 'LazyLocalContext', 'LocalContext', 'OpenAPIContext', 'SSHContext'}
abstract read_file(fname)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'BohriumContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServer': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServerContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'HDFS': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'HDFSContext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'LazyLocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'LazyLocalContext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'Lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'LebesgueContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'Local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'LocalContext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'OpenAPI': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'OpenAPIContext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'SSH': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'SSHContext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'bohriumcontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudserver': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudservercontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'hdfs': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'hdfscontext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'lazylocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lazylocalcontext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'lebesguecontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'localcontext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'openapi': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'openapicontext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'ssh': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'sshcontext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>}
abstract upload(submission)[source]
abstract write_file(fname, write_str)[source]

dpdispatcher.dlog module

dpdispatcher.dpdisp module

dpdispatcher.dpdisp.main()[source]
dpdispatcher.dpdisp.main_parser() ArgumentParser[source]

Dpdispatcher commandline options argument parser.

Returns:
argparse.ArgumentParser

the argument parser

Notes

This function is used by documentation.

dpdispatcher.dpdisp.parse_args(args: List[str] | None = None)[source]

Dpdispatcher commandline options argument parsing.

Parameters:
argsList[str]

list of command line arguments, main purpose is testing default option None takes arguments from sys.argv

dpdispatcher.machine module

class dpdispatcher.machine.Machine(*args, **kwargs)[source]

Bases: object

A machine is used to handle the connection with remote machines.

Parameters:
contextSubClass derived from BaseContext

The context is used to mainatin the connection with remote machine.

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

alias: Tuple[str, ...] = ()
classmethod arginfo()[source]
bind_context(context)[source]
abstract check_finish_tag(**kwargs)[source]
check_if_recover(submission)[source]
abstract check_status(job)[source]
default_resources(res)[source]
classmethod deserialize(machine_dict)[source]
abstract do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_command_env_cuda_devices(resources)[source]
gen_script(job)[source]
gen_script_command(job)[source]
gen_script_custom_flags_lines(job)[source]
gen_script_end(job)[source]
gen_script_env(job)[source]
abstract gen_script_header(job)[source]
gen_script_run_command(job)[source]
gen_script_wait(resources)[source]
get_exit_code(job)[source]

Get exit code of the job.

Parameters:
jobJob

job

kill(job)[source]

Kill the job.

If not implemented, pass and let the user manually kill it.

Parameters:
jobJob

job

classmethod load_from_dict(machine_dict)[source]
classmethod load_from_json(json_path)[source]
classmethod load_from_yaml(yaml_path)[source]
options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
classmethod resources_arginfo() Argument[source]

Generate the resources arginfo.

Returns:
Argument

resources arginfo

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

serialize(if_empty_remote_profile=False)[source]
sub_script_cmd(res)[source]
sub_script_head(res)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}

dpdispatcher.submission module

class dpdispatcher.submission.Job(job_task_list, *, resources, machine=None)[source]

Bases: object

Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.

Parameters:
job_task_listlist of Task

the tasks belonging to the job

resourcesResources

the machine resources. Passed from Submission when it constructs jobs.

machinemachine

machine object to execute the job. Passed from Submission when it constructs jobs.

Methods

deserialize(job_dict[, machine])

Convert the job_dict to a Submission class object.

get_job_state()

Get the jobs.

get_last_error_message()

Get last error message when the job is terminated.

serialize([if_static])

Convert the Task class instance to a dictionary.

get_hash

handle_unexpected_job_state

job_to_json

register_job_id

submit_job

classmethod deserialize(job_dict, machine=None)[source]

Convert the job_dict to a Submission class object.

Parameters:
job_dictdict

the dictionary which contains the job information

machineMachine

the machine object to execute the job

Returns:
submissionJob

the Job class instance converted from the job_dict

get_hash()[source]
get_job_state()[source]

Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.

Notes

this method will not submit or resubmit the jobs if the job is unsubmitted.

get_last_error_message() str | None[source]

Get last error message when the job is terminated.

handle_unexpected_job_state()[source]
job_to_json()[source]
register_job_id(job_id)[source]
serialize(if_static=False)[source]

Convert the Task class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.

Returns:
task_dictdict

the dictionary converted from the Task class instance

submit_job()[source]
class dpdispatcher.submission.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]

Bases: object

Resources is used to describe the machine resources we need to do calculations.

Parameters:
number_nodeint

The number of node need for each job.

cpu_per_nodeint

cpu numbers of each node.

gpu_per_nodeint

gpu numbers of each node.

queue_namestr

The queue name of batch job scheduler system.

group_sizeint

The number of tasks in a job.

custom_flagslist of Str

The extra lines pass to job submitting script header

strategydict

strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool

If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.

ratio_unfinishedfloat

The ratio of task that can be unfinished.

customized_script_header_template_filestr

The customized template file to generate job submitting script header, which overrides the default file.

para_degint

Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]

source_listlist of Path

The env file to be sourced before the command execution.

wait_timeint

The waitting time in second after a single task submitted. Default: 0.

Methods

arginfo

deserialize

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo(detail_kwargs=True)[source]
classmethod deserialize(resources_dict)[source]
classmethod load_from_dict(resources_dict)[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]
class dpdispatcher.submission.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]

Bases: object

A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.

Parameters:
work_basePath

the base directory of the local tasks. It is usually the dir name of project .

machineMachine

machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.

resourcesResources

the machine resources (cpu or gpu) used to generate the slurm/pbs script

forward_common_fileslist

the common files to be uploaded to other computers before the jobs begin

backward_common_fileslist

the common files to be downloaded from other computers after the jobs finish

task_listlist of Task

a list of tasks to be run.

Methods

async_run_submission(**kwargs)

Async interface of run_submission.

bind_machine(machine)

Bind this submission to a machine.

check_all_finished()

Check whether all the jobs in the submission.

check_ratio_unfinished(ratio_unfinished)

Calculate the ratio of unfinished tasks in the submission.

deserialize(submission_dict[, machine])

Convert the submission_dict to a Submission class object.

generate_jobs()

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.

handle_unexpected_submission_state()

Handle unexpected job state of the submission.

run_submission(*[, dry_run, exit_on_submit, ...])

Main method to execute the submission.

serialize([if_static])

Convert the Submission class instance to a dictionary.

update_submission_state()

Check whether all the jobs in the submission.

clean_jobs

download_jobs

get_hash

register_task

register_task_list

remove_unfinished_tasks

submission_from_json

submission_to_json

try_download_result

try_recover_from_json

upload_jobs

async async_run_submission(**kwargs)[source]

Async interface of run_submission.

Examples

>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
...     backgroud_task = set()
...     # task1
...     task1 = Task(...)
...     submission1 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission1.async_run_submission(check_interval=2, clean=False)
...     )
...     # task2
...     task2 = Task(...)
...     submission2 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission2.async_run_submission(check_interval=2, clean=False)
...     )
...     background_tasks.add(background_task)
...     result = await asyncio.gather(*background_tasks)
...     return result
>>> run_jobs()

May raise Error if pass clean=True explicitly when submit to pbs or slurm.

bind_machine(machine)[source]

Bind this submission to a machine. update the machine’s context remote_root and local_root.

Parameters:
machineMachine

the machine to bind with

check_all_finished()[source]

Check whether all the jobs in the submission.

Notes

This method will not handle unexpected job state in the submission.

check_ratio_unfinished(ratio_unfinished: float) bool[source]

Calculate the ratio of unfinished tasks in the submission.

Parameters:
ratio_unfinishedfloat

the ratio of unfinished tasks in the submission

Returns:
bool

whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished

clean_jobs()[source]
classmethod deserialize(submission_dict, machine=None)[source]

Convert the submission_dict to a Submission class object.

Parameters:
submission_dictdict

path-like, the base directory of the local tasks

machineMachine

Machine class Object to execute the jobs

Returns:
submissionSubmission

the Submission class instance converted from the submission_dict

download_jobs()[source]
generate_jobs()[source]

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.

get_hash()[source]
handle_unexpected_submission_state()[source]

Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.

register_task(task)[source]
register_task_list(task_list)[source]
remove_unfinished_tasks()[source]
run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]

Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.

serialize(if_static=False)[source]

Convert the Submission class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.

Returns:
submission_dictdict

the dictionary converted from the Submission class instance

classmethod submission_from_json(json_file_name='submission.json')[source]
submission_to_json()[source]
try_download_result()[source]
try_recover_from_json()[source]
update_submission_state()[source]

Check whether all the jobs in the submission.

Notes

this method will not handle unexpected (like resubmit terminated) job state in the submission.

upload_jobs()[source]
class dpdispatcher.submission.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]

Bases: object

A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.

Parameters:
commandStr

the command to be executed.

task_work_pathPath

the directory of each file where the files are dependent on.

forward_fileslist of Path

the files to be transmitted to remote machine before the command execute.

backward_fileslist of Path

the files to be transmitted from remote machine after the comand finished.

outlogStr

the filename to which command redirect stdout

errlogStr

the filename to which command redirect stderr

Methods

deserialize(task_dict)

Convert the task_dict to a Task class object.

get_task_state(context)

Get the task state by checking the tag file.

arginfo

get_hash

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo()[source]
classmethod deserialize(task_dict)[source]

Convert the task_dict to a Task class object.

Parameters:
task_dictdict

the dictionary which contains the task information

Returns:
taskTask

the Task class instance converted from the task_dict

get_hash()[source]
get_task_state(context)[source]

Get the task state by checking the tag file.

Parameters:
contextContext

the context of the task

classmethod load_from_dict(task_dict: dict) Task[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]