dpdispatcher.utils package

Utils.

Subpackages

Submodules

dpdispatcher.utils.hdfs_cli module

class dpdispatcher.utils.hdfs_cli.HDFS[source]

Bases: object

Fundamental class for HDFS basic manipulation.

Methods

copy_from_local(local_path, to_uri)

Returns: True on success Raises: on unexpected error.

exists(uri)

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

mkdir(uri)

Make new hdfs directory Returns: True on success Raises: RuntimeError.

remove(uri)

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

copy_to_local

move

read_hdfs_file

static copy_from_local(local_path, to_uri)[source]

Returns: True on success Raises: on unexpected error.

static copy_to_local(from_uri, local_path)[source]
static exists(uri)[source]

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

static mkdir(uri)[source]

Make new hdfs directory Returns: True on success Raises: RuntimeError.

static move(from_uri, to_uri)[source]
static read_hdfs_file(uri)[source]
static remove(uri)[source]

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

dpdispatcher.utils.job_status module

class dpdispatcher.utils.job_status.JobStatus(value)[source]

Bases: IntEnum

An enumeration.

completing = 6
finished = 5
running = 3
terminated = 4
unknown = 100
unsubmitted = 1
waiting = 2

dpdispatcher.utils.record module

dpdispatcher.utils.utils module

exception dpdispatcher.utils.utils.RetrySignal[source]

Bases: Exception

Exception to give a signal to retry the function.

dpdispatcher.utils.utils.customized_script_header_template(filename: PathLike, resources: Resources) str[source]
dpdispatcher.utils.utils.generate_totp(secret: str, period: int = 30, token_length: int = 6) str[source]

Generate time-based one time password (TOTP) from the secret.

Some HPCs use TOTP for two-factor authentication for safety.

Parameters:
secretstr

The encoded secret provided by the HPC. It’s usually extracted from a 2D code and base32 encoded.

periodint, default=30

Time period where the code is valid in seconds.

token_lengthint, default=6

The token length.

Returns:
token: str

The generated token.

References

https://github.com/lepture/otpauth/blob/49914d83d36dbcd33c9e26f65002b21ce09a6303/otpauth.py#L143-L160

dpdispatcher.utils.utils.get_sha256(filename)[source]

Get sha256 of a file.

Parameters:
filenamestr

The filename.

Returns:
sha256: str

The sha256.

dpdispatcher.utils.utils.hotp(key: str, period: int, token_length: int = 6, digest='sha1')[source]
dpdispatcher.utils.utils.retry(max_retry: int = 3, sleep: int | float = 60, catch_exception: ~typing.Type[BaseException] = <class 'dpdispatcher.utils.utils.RetrySignal'>) Callable[source]

Retry the function until it succeeds or fails for certain times.

Parameters:
max_retryint, default=3

The maximum retry times. If None, it will retry forever.

sleepint or float, default=60

The sleep time in seconds.

catch_exceptionException, default=Exception

The exception to catch.

Returns:
decorator: Callable

The decorator.

Examples

>>> @retry(max_retry=3, sleep=60, catch_exception=RetrySignal)
... def func():
...     raise RetrySignal("Failed")
dpdispatcher.utils.utils.rsync(from_file: str, to_file: str, port: int = 22, key_filename: str | None = None, timeout: int | float = 10)[source]

Call rsync to transfer files.

Parameters:
from_filestr

SRC

to_filestr

DEST

portint, default=22

port for ssh

key_filenamestr, optional

identity file name

timeoutint, default=10

timeout for ssh

Raises:
RuntimeError

when return code is not 0

dpdispatcher.utils.utils.run_cmd_with_all_output(cmd, shell=True)[source]