import os
import shutil
import tarfile
from glob import glob
from dpdispatcher.base_context import BaseContext
from dpdispatcher.dlog import dlog
from dpdispatcher.utils.hdfs_cli import HDFS
[docs]
class HDFSContext(BaseContext):
def __init__(
self,
local_root,
remote_root,
remote_profile={},
*args,
**kwargs,
):
assert isinstance(local_root, str)
self.init_local_root = local_root
self.init_remote_root = remote_root
self.temp_local_root = os.path.abspath(local_root)
self.temp_remote_root = remote_root
self.remote_profile = remote_profile
[docs]
@classmethod
def load_from_dict(cls, context_dict):
local_root = context_dict["local_root"]
remote_root = context_dict["remote_root"]
remote_profile = context_dict.get("remote_profile", {})
instance = cls(
local_root=local_root,
remote_root=remote_root,
remote_profile=remote_profile,
)
return instance
[docs]
def get_job_root(self):
return self.remote_root
[docs]
def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = os.path.join(
self.temp_remote_root, submission.submission_hash
)
HDFS.mkdir(self.remote_root)
def _put_files(self, files, dereference=True):
of = self.submission.submission_hash + "_upload.tgz"
# local tar
if os.path.isfile(os.path.join(self.local_root, of)):
os.remove(os.path.join(self.local_root, of))
with tarfile.open(
os.path.join(self.local_root, of), "w:gz", dereference=dereference
) as tar:
for ii in files:
ii_full = os.path.join(self.local_root, ii)
tar.add(ii_full, arcname=ii)
# trans
from_f = os.path.join(self.local_root, of)
HDFS.copy_from_local(os.path.join(self.local_root, of), self.remote_root)
# clean up
os.remove(from_f)
[docs]
def upload(self, submission, dereference=True):
"""Upload forward files and forward command files to HDFS root dir.
Parameters
----------
submission : Submission class instance
represents a collection of tasks, such as forward file names
dereference : bool
whether to dereference symbolic links
Returns
-------
none
"""
file_list = []
for task in submission.belonging_tasks:
local_job = os.path.join(self.local_root, task.task_work_path)
for ff in task.forward_files:
abs_file_list = glob(os.path.join(local_job, ff))
if not abs_file_list:
raise FileNotFoundError(
"cannot find upload file " + os.path.join(local_job, ff)
)
rel_file_list = [
os.path.relpath(ii, self.local_root) for ii in abs_file_list
]
file_list.extend(rel_file_list)
local_job = self.local_root
for fc in submission.forward_common_files:
abs_file_list = glob(os.path.join(local_job, fc))
if not abs_file_list:
raise FileNotFoundError(
"cannot find upload file " + os.path.join(local_job, fc)
)
rel_file_list = [
os.path.relpath(ii, self.local_root) for ii in abs_file_list
]
file_list.extend(rel_file_list)
self._put_files(file_list, dereference=dereference)
[docs]
def download(
self, submission, check_exists=False, mark_failure=True, back_error=False
):
"""Download backward files from HDFS root dir.
Parameters
----------
submission : Submission class instance
represents a collection of tasks, such as backward file names
check_exists : bool
whether to check if the file exists
mark_failure : bool
whether to mark the task as failed if the file does not exist
back_error : bool
whether to download error files
Returns
-------
none
"""
cwd = os.getcwd()
# download all hdfs files to tmp dir
gz_dir = os.path.join(self.local_root, "tmp")
if os.path.exists(gz_dir):
shutil.rmtree(gz_dir, ignore_errors=True)
os.mkdir(os.path.join(self.local_root, "tmp"))
rfile_tgz = f"{self.remote_root}/{submission.submission_hash}_*_download.tar.gz"
lfile_tgz = f"{self.local_root}/tmp/"
HDFS.copy_to_local(rfile_tgz, lfile_tgz)
tgz_file_list = glob(os.path.join(self.local_root, "tmp/*_download.tar.gz"))
for tgz in tgz_file_list:
with tarfile.open(tgz, "r:gz") as tar:
tar.extractall(path=gz_dir)
for task in submission.belonging_tasks:
local_job = os.path.join(self.local_root, task.task_work_path)
remote_job = os.path.join(gz_dir, task.task_work_path)
flist = task.backward_files
if back_error:
errors = glob(os.path.join(remote_job, "error*"))
flist.extend(errors)
for jj in flist:
rfile = os.path.join(remote_job, jj)
lfile = os.path.join(local_job, jj)
if not os.path.exists(rfile):
if check_exists:
if mark_failure:
with open(
os.path.join(
self.local_root,
task.task_work_path,
f"tag_failure_download_{jj}",
),
"w",
) as fp:
pass
else:
raise FileNotFoundError(
"do not find download file " + rfile
)
else:
raise FileNotFoundError("do not find download file " + rfile)
else:
if os.path.exists(lfile):
dlog.info(f"find existing {lfile}, replacing by {rfile}")
if os.path.isdir(lfile):
shutil.rmtree(lfile, ignore_errors=True)
elif os.path.isfile(lfile):
os.remove(lfile)
shutil.move(rfile, lfile)
local_job = self.local_root
remote_job = gz_dir
flist = submission.backward_common_files
if back_error:
errors = glob(os.path.join(remote_job, "error*"))
flist.extend(errors)
for jj in flist:
rfile = os.path.join(remote_job, jj)
lfile = os.path.join(local_job, jj)
if not os.path.exists(rfile):
if check_exists:
if mark_failure:
with open(
os.path.join(self.local_root, f"tag_failure_download_{jj}"),
"w",
) as fp:
pass
else:
raise FileNotFoundError("do not find download file " + rfile)
else:
raise FileNotFoundError("do not find download file " + rfile)
else:
if os.path.exists(lfile):
dlog.info(f"find existing {lfile}, replacing by {rfile}")
if os.path.isdir(lfile):
shutil.rmtree(lfile, ignore_errors=True)
elif os.path.isfile(lfile):
os.remove(lfile)
shutil.move(rfile, lfile)
# remove tmp dir
shutil.rmtree(gz_dir, ignore_errors=True)
[docs]
def check_file_exists(self, fname):
"""Check whether the given file exists, often used in checking whether the belonging job has finished.
Parameters
----------
fname : string
file name to be checked
Returns
-------
status: boolean
"""
return HDFS.exists(os.path.join(self.remote_root, fname))
[docs]
def clean(self):
HDFS.remove(self.remote_root)
[docs]
def write_file(self, fname, write_str):
local_file = os.path.join("/tmp/", fname)
with open(local_file, "w") as fp:
fp.write(write_str)
HDFS.copy_from_local(local_file, os.path.join(self.remote_root, fname))
return local_file
[docs]
def read_file(self, fname):
return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname))