"""Logger initialization for package."""
import logging
import os
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from pathlib import Path
from mpi4py import MPI
_MPI_APPEND_MODE = MPI.MODE_CREATE | MPI.MODE_APPEND
logging.getLogger(__name__)
__all__ = ["set_log_handles"]
# logger formater
FFORMATTER = logging.Formatter(
"[%(asctime)s] %(app_name)s %(levelname)-7s %(name)-45s %(message)s"
)
CFORMATTER = logging.Formatter(
# "%(app_name)s %(levelname)-7s |-> %(name)-45s %(message)s"
"%(app_name)s %(levelname)-7s %(message)s"
)
FFORMATTER_MPI = logging.Formatter(
"[%(asctime)s] %(app_name)s rank:%(rank)-2s %(levelname)-7s %(name)-45s %(message)s"
)
CFORMATTER_MPI = logging.Formatter(
# "%(app_name)s rank:%(rank)-2s %(levelname)-7s |-> %(name)-45s %(message)s"
"%(app_name)s rank:%(rank)-2s %(levelname)-7s %(message)s"
)
class _AppFilter(logging.Filter):
"""Add field `app_name` to log messages."""
def filter(self, record):
record.app_name = "DEEPMD"
return True
class _MPIRankFilter(logging.Filter):
"""Add MPI rank number to log messages, adds field `rank`."""
def __init__(self, rank: int) -> None:
super().__init__(name="MPI_rank_id")
self.mpi_rank = str(rank)
def filter(self, record):
record.rank = self.mpi_rank
return True
class _MPIMasterFilter(logging.Filter):
"""Filter that lets through only messages emited from rank==0."""
def __init__(self, rank: int) -> None:
super().__init__(name="MPI_master_log")
self.mpi_rank = rank
def filter(self, record):
if self.mpi_rank == 0:
return True
else:
return False
class _MPIFileStream:
"""Wrap MPI.File` so it has the same API as python file streams.
Parameters
----------
filename : Path
disk location of the file stream
MPI : MPI
MPI communicator object
mode : str, optional
file write mode, by default _MPI_APPEND_MODE
"""
def __init__(
self, filename: "Path", MPI: "MPI", mode: str = "_MPI_APPEND_MODE"
) -> None:
self.stream = MPI.File.Open(MPI.COMM_WORLD, filename, mode)
self.stream.Set_atomicity(True)
self.name = "MPIfilestream"
def write(self, msg: str):
"""Write to MPI shared file stream.
Parameters
----------
msg : str
message to write
"""
b = bytearray()
b.extend(map(ord, msg))
self.stream.Write_shared(b)
def close(self):
"""Synchronize and close MPI file stream."""
self.stream.Sync()
self.stream.Close()
class _MPIHandler(logging.FileHandler):
"""Emulate `logging.FileHandler` with MPI shared File that all ranks can write to.
Parameters
----------
filename : Path
file path
MPI : MPI
MPI communicator object
mode : str, optional
file access mode, by default "_MPI_APPEND_MODE"
"""
def __init__(
self,
filename: "Path",
MPI: "MPI",
mode: str = "_MPI_APPEND_MODE",
) -> None:
self.MPI = MPI
super().__init__(filename, mode=mode, encoding=None, delay=False)
def _open(self):
return _MPIFileStream(self.baseFilename, self.MPI, self.mode)
def setStream(self, stream):
"""Stream canot be reasigned in MPI mode."""
raise NotImplementedError("Unable to do for MPI file handler!")
[docs]def set_log_handles(
level: int,
log_path: Optional["Path"] = None,
mpi_log: Optional[str] = None
):
"""Set desired level for package loggers and add file handlers.
Parameters
----------
level: int
logging level
log_path: Optional[str]
path to log file, if None logs will be send only to console. If the parent
directory does not exist it will be automatically created, by default None
mpi_log : Optional[str], optional
mpi log type. Has three options. `master` will output logs to file and console
only from rank==0. `collect` will write messages from all ranks to one file
opened under rank==0 and to console. `workers` will open one log file for each
worker designated by its rank, console behaviour is the same as for `collect`.
If this argument is specified, package 'mpi4py' must be already installed.
by default None
Raises
------
RuntimeError
If the argument `mpi_log` is specified, package `mpi4py` is not installed.
References
----------
https://groups.google.com/g/mpi4py/c/SaNzc8bdj6U
https://stackoverflow.com/questions/35869137/avoid-tensorflow-print-on-standard-error
https://stackoverflow.com/questions/56085015/suppress-openmp-debug-messages-when-running-tensorflow-on-cpu
Notes
-----
Logging levels:
+---------+--------------+----------------+----------------+----------------+
| | our notation | python logging | tensorflow cpp | OpenMP |
+=========+==============+================+================+================+
| debug | 10 | 10 | 0 | 1/on/true/yes |
+---------+--------------+----------------+----------------+----------------+
| info | 20 | 20 | 1 | 0/off/false/no |
+---------+--------------+----------------+----------------+----------------+
| warning | 30 | 30 | 2 | 0/off/false/no |
+---------+--------------+----------------+----------------+----------------+
| error | 40 | 40 | 3 | 0/off/false/no |
+---------+--------------+----------------+----------------+----------------+
"""
# silence logging for OpenMP when running on CPU if level is any other than debug
if level <= 10:
os.environ["KMP_WARNINGS"] = "FALSE"
# set TF cpp internal logging level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = str(int((level / 10) - 1))
# get root logger
root_log = logging.getLogger()
# remove all old handlers
root_log.setLevel(level)
for hdlr in root_log.handlers[:]:
root_log.removeHandler(hdlr)
# check if arguments are present
MPI = None
if mpi_log:
try:
from mpi4py import MPI
except ImportError as e:
raise RuntimeError("You cannot specify 'mpi_log' when mpi4py not installed") from e
# * add console handler ************************************************************
ch = logging.StreamHandler()
if MPI:
rank = MPI.COMM_WORLD.Get_rank()
if mpi_log == "master":
ch.setFormatter(CFORMATTER)
ch.addFilter(_MPIMasterFilter(rank))
else:
ch.setFormatter(CFORMATTER_MPI)
ch.addFilter(_MPIRankFilter(rank))
else:
ch.setFormatter(CFORMATTER)
ch.setLevel(level)
ch.addFilter(_AppFilter())
root_log.addHandler(ch)
# * add file handler ***************************************************************
if log_path:
# create directory
log_path.parent.mkdir(exist_ok=True, parents=True)
fh = None
if mpi_log == "master":
rank = MPI.COMM_WORLD.Get_rank()
if rank == 0:
fh = logging.FileHandler(log_path, mode="w")
fh.addFilter(_MPIMasterFilter(rank))
fh.setFormatter(FFORMATTER)
elif mpi_log == "collect":
rank = MPI.COMM_WORLD.Get_rank()
fh = _MPIHandler(log_path, MPI, mode=MPI.MODE_WRONLY | MPI.MODE_CREATE)
fh.addFilter(_MPIRankFilter(rank))
fh.setFormatter(FFORMATTER_MPI)
elif mpi_log == "workers":
rank = MPI.COMM_WORLD.Get_rank()
# if file has suffix than inser rank number before suffix
# e.g deepmd.log -> deepmd_<rank>.log
# if no suffix is present, insert rank as suffix
# e.g. deepmdlog -> deepmdlog.<rank>
if log_path.suffix:
worker_log = (log_path.parent / f"{log_path.stem}_{rank}").with_suffix(
log_path.suffix
)
else:
worker_log = log_path.with_suffix(f".{rank}")
fh = logging.FileHandler(worker_log, mode="w")
fh.setFormatter(FFORMATTER)
else:
fh = logging.FileHandler(log_path, mode="w")
fh.setFormatter(FFORMATTER)
if fh:
fh.setLevel(level)
fh.addFilter(_AppFilter())
root_log.addHandler(fh)