Source code for deepmd.entrypoints.test

# SPDX-License-Identifier: LGPL-3.0-or-later
"""Test trained DeePMD model."""

import logging
from pathlib import (
    Path,
)
from typing import (
    TYPE_CHECKING,
    Dict,
    List,
    Optional,
    Tuple,
)

import numpy as np

from deepmd.common import (
    expand_sys_str,
)
from deepmd.infer.deep_dipole import (
    DeepDipole,
)
from deepmd.infer.deep_dos import (
    DeepDOS,
)
from deepmd.infer.deep_eval import (
    DeepEval,
)
from deepmd.infer.deep_polar import (
    DeepGlobalPolar,
    DeepPolar,
)
from deepmd.infer.deep_pot import (
    DeepPot,
)
from deepmd.infer.deep_wfc import (
    DeepWFC,
)
from deepmd.utils import random as dp_random
from deepmd.utils.data import (
    DeepmdData,
)
from deepmd.utils.weight_avg import (
    weighted_average,
)

if TYPE_CHECKING:
    from deepmd.tf.infer import (
        DeepDipole,
        DeepDOS,
        DeepPolar,
        DeepPot,
        DeepWFC,
    )
    from deepmd.tf.infer.deep_tensor import (
        DeepTensor,
    )

__all__ = ["test"]

log = logging.getLogger(__name__)


[docs]def test( *, model: str, system: str, datafile: str, set_prefix: str, numb_test: int, rand_seed: Optional[int], shuffle_test: bool, detail_file: str, atomic: bool, head: Optional[str] = None, **kwargs, ): """Test model predictions. Parameters ---------- model : str path where model is stored system : str system directory datafile : str the path to the list of systems to test set_prefix : str string prefix of set numb_test : int munber of tests to do. 0 means all data. rand_seed : Optional[int] seed for random generator shuffle_test : bool whether to shuffle tests detail_file : Optional[str] file where test details will be output atomic : bool whether per atom quantities should be computed head : Optional[str], optional (Supported backend: PyTorch) Task head to test if in multi-task mode. **kwargs additional arguments Raises ------ RuntimeError if no valid system was found """ if numb_test == 0: # only float has inf, but should work for min numb_test = float("inf") if datafile is not None: with open(datafile) as datalist: all_sys = datalist.read().splitlines() else: all_sys = expand_sys_str(system) if len(all_sys) == 0: raise RuntimeError("Did not find valid system") err_coll = [] siz_coll = [] # init random seed if rand_seed is not None: dp_random.seed(rand_seed % (2**32)) # init model dp = DeepEval(model, head=head) for cc, system in enumerate(all_sys): log.info("# ---------------output of dp test--------------- ") log.info(f"# testing system : {system}") # create data class tmap = dp.get_type_map() if isinstance(dp, DeepPot) else None data = DeepmdData( system, set_prefix, shuffle_test=shuffle_test, type_map=tmap, sort_atoms=False, ) if isinstance(dp, DeepPot): err = test_ener( dp, data, system, numb_test, detail_file, atomic, append_detail=(cc != 0), ) elif isinstance(dp, DeepDOS): err = test_dos( dp, data, system, numb_test, detail_file, atomic, append_detail=(cc != 0), ) elif isinstance(dp, DeepDipole): err = test_dipole(dp, data, numb_test, detail_file, atomic) elif isinstance(dp, DeepPolar): err = test_polar(dp, data, numb_test, detail_file, atomic=atomic) elif isinstance(dp, DeepGlobalPolar): # should not appear in this new version log.warning( "Global polar model is not currently supported. Please directly use the polar mode and change loss parameters." ) err = test_polar( dp, data, numb_test, detail_file, atomic=False ) # YWolfeee: downward compatibility log.info("# ----------------------------------------------- ") err_coll.append(err) avg_err = weighted_average(err_coll) if len(all_sys) != len(err_coll): log.warning("Not all systems are tested! Check if the systems are valid") if len(all_sys) > 1: log.info("# ----------weighted average of errors----------- ") log.info(f"# number of systems : {len(all_sys)}") if isinstance(dp, DeepPot): print_ener_sys_avg(avg_err) elif isinstance(dp, DeepDOS): print_dos_sys_avg(avg_err) elif isinstance(dp, DeepDipole): print_dipole_sys_avg(avg_err) elif isinstance(dp, DeepPolar): print_polar_sys_avg(avg_err) elif isinstance(dp, DeepGlobalPolar): print_polar_sys_avg(avg_err) elif isinstance(dp, DeepGlobalPolar): print_wfc_sys_avg(avg_err) log.info("# ----------------------------------------------- ")
def mae(diff: np.ndarray) -> float: """Calcalte mean absulote error. Parameters ---------- diff : np.ndarray difference Returns ------- float mean absulote error """ return np.mean(np.abs(diff)) def rmse(diff: np.ndarray) -> float: """Calculate root mean square error. Parameters ---------- diff : np.ndarray difference Returns ------- float root mean square error """ return np.sqrt(np.average(diff * diff)) def save_txt_file( fname: Path, data: np.ndarray, header: str = "", append: bool = False ): """Save numpy array to test file. Parameters ---------- fname : str filename data : np.ndarray data to save to disk header : str, optional header string to use in file, by default "" append : bool, optional if true file will be appended insted of overwriting, by default False """ flags = "ab" if append else "w" with fname.open(flags) as fp: np.savetxt(fp, data, header=header) def test_ener( dp: "DeepPot", data: DeepmdData, system: str, numb_test: int, detail_file: Optional[str], has_atom_ener: bool, append_detail: bool = False, ) -> Tuple[List[np.ndarray], List[int]]: """Test energy type model. Parameters ---------- dp : DeepPot instance of deep potential data : DeepmdData data container object system : str system directory numb_test : int munber of tests to do detail_file : Optional[str] file where test details will be output has_atom_ener : bool whether per atom quantities should be computed append_detail : bool, optional if true append output detail file, by default False Returns ------- Tuple[List[np.ndarray], List[int]] arrays with results and their shapes """ data.add("energy", 1, atomic=False, must=False, high_prec=True) data.add("force", 3, atomic=True, must=False, high_prec=False) data.add("virial", 9, atomic=False, must=False, high_prec=False) if dp.has_efield: data.add("efield", 3, atomic=True, must=True, high_prec=False) if has_atom_ener: data.add("atom_ener", 1, atomic=True, must=True, high_prec=False) if dp.get_dim_fparam() > 0: data.add( "fparam", dp.get_dim_fparam(), atomic=False, must=True, high_prec=False ) if dp.get_dim_aparam() > 0: data.add("aparam", dp.get_dim_aparam(), atomic=True, must=True, high_prec=False) if dp.has_spin: data.add("spin", 3, atomic=True, must=True, high_prec=False) data.add("force_mag", 3, atomic=True, must=False, high_prec=False) test_data = data.get_test() mixed_type = data.mixed_type natoms = len(test_data["type"][0]) nframes = test_data["box"].shape[0] numb_test = min(nframes, numb_test) coord = test_data["coord"][:numb_test].reshape([numb_test, -1]) box = test_data["box"][:numb_test] if dp.has_efield: efield = test_data["efield"][:numb_test].reshape([numb_test, -1]) else: efield = None if dp.has_spin: spin = test_data["spin"][:numb_test].reshape([numb_test, -1]) else: spin = None if not data.pbc: box = None if mixed_type: atype = test_data["type"][:numb_test].reshape([numb_test, -1]) else: atype = test_data["type"][0] if dp.get_dim_fparam() > 0: fparam = test_data["fparam"][:numb_test] else: fparam = None if dp.get_dim_aparam() > 0: aparam = test_data["aparam"][:numb_test] else: aparam = None ret = dp.eval( coord, box, atype, fparam=fparam, aparam=aparam, atomic=has_atom_ener, efield=efield, mixed_type=mixed_type, spin=spin, ) energy = ret[0] force = ret[1] virial = ret[2] energy = energy.reshape([numb_test, 1]) force = force.reshape([numb_test, -1]) virial = virial.reshape([numb_test, 9]) if has_atom_ener: ae = ret[3] av = ret[4] ae = ae.reshape([numb_test, -1]) av = av.reshape([numb_test, -1]) if dp.has_spin: force_m = ret[5] force_m = force_m.reshape([numb_test, -1]) mask_mag = ret[6] mask_mag = mask_mag.reshape([numb_test, -1]) else: if dp.has_spin: force_m = ret[3] force_m = force_m.reshape([numb_test, -1]) mask_mag = ret[4] mask_mag = mask_mag.reshape([numb_test, -1]) out_put_spin = dp.get_ntypes_spin() != 0 or dp.has_spin if out_put_spin: if dp.get_ntypes_spin() != 0: # old tf support for spin ntypes_real = dp.get_ntypes() - dp.get_ntypes_spin() nloc = natoms nloc_real = sum( [np.count_nonzero(atype == ii) for ii in range(ntypes_real)] ) force_r = np.split( force, indices_or_sections=[nloc_real * 3, nloc * 3], axis=1 )[0] force_m = np.split( force, indices_or_sections=[nloc_real * 3, nloc * 3], axis=1 )[1] test_force_r = np.split( test_data["force"][:numb_test], indices_or_sections=[nloc_real * 3, nloc * 3], axis=1, )[0] test_force_m = np.split( test_data["force"][:numb_test], indices_or_sections=[nloc_real * 3, nloc * 3], axis=1, )[1] else: # pt support for spin force_r = force test_force_r = test_data["force"][:numb_test] # The shape of force_m and test_force_m are [-1, 3], # which is designed for mixed_type cases force_m = force_m.reshape(-1, 3)[mask_mag.reshape(-1)] test_force_m = test_data["force_mag"][:numb_test].reshape(-1, 3)[ mask_mag.reshape(-1) ] diff_e = energy - test_data["energy"][:numb_test].reshape([-1, 1]) mae_e = mae(diff_e) rmse_e = rmse(diff_e) diff_f = force - test_data["force"][:numb_test] mae_f = mae(diff_f) rmse_f = rmse(diff_f) diff_v = virial - test_data["virial"][:numb_test] mae_v = mae(diff_v) rmse_v = rmse(diff_v) mae_ea = mae_e / natoms rmse_ea = rmse_e / natoms mae_va = mae_v / natoms rmse_va = rmse_v / natoms if has_atom_ener: diff_ae = test_data["atom_ener"][:numb_test].reshape([-1]) - ae.reshape([-1]) mae_ae = mae(diff_ae) rmse_ae = rmse(diff_ae) if out_put_spin: mae_fr = mae(force_r - test_force_r) mae_fm = mae(force_m - test_force_m) rmse_fr = rmse(force_r - test_force_r) rmse_fm = rmse(force_m - test_force_m) log.info(f"# number of test data : {numb_test:d} ") log.info(f"Energy MAE : {mae_e:e} eV") log.info(f"Energy RMSE : {rmse_e:e} eV") log.info(f"Energy MAE/Natoms : {mae_ea:e} eV") log.info(f"Energy RMSE/Natoms : {rmse_ea:e} eV") if not out_put_spin: log.info(f"Force MAE : {mae_f:e} eV/A") log.info(f"Force RMSE : {rmse_f:e} eV/A") else: log.info(f"Force atom MAE : {mae_fr:e} eV/A") log.info(f"Force atom RMSE : {rmse_fr:e} eV/A") log.info(f"Force spin MAE : {mae_fm:e} eV/uB") log.info(f"Force spin RMSE : {rmse_fm:e} eV/uB") if data.pbc and not out_put_spin: log.info(f"Virial MAE : {mae_v:e} eV") log.info(f"Virial RMSE : {rmse_v:e} eV") log.info(f"Virial MAE/Natoms : {mae_va:e} eV") log.info(f"Virial RMSE/Natoms : {rmse_va:e} eV") if has_atom_ener: log.info(f"Atomic ener MAE : {mae_ae:e} eV") log.info(f"Atomic ener RMSE : {rmse_ae:e} eV") if detail_file is not None: detail_path = Path(detail_file) pe = np.concatenate( ( np.reshape(test_data["energy"][:numb_test], [-1, 1]), np.reshape(energy, [-1, 1]), ), axis=1, ) save_txt_file( detail_path.with_suffix(".e.out"), pe, header="%s: data_e pred_e" % system, append=append_detail, ) pe_atom = pe / natoms save_txt_file( detail_path.with_suffix(".e_peratom.out"), pe_atom, header="%s: data_e pred_e" % system, append=append_detail, ) if not out_put_spin: pf = np.concatenate( ( np.reshape(test_data["force"][:numb_test], [-1, 3]), np.reshape(force, [-1, 3]), ), axis=1, ) save_txt_file( detail_path.with_suffix(".f.out"), pf, header="%s: data_fx data_fy data_fz pred_fx pred_fy pred_fz" % system, append=append_detail, ) else: pf_real = np.concatenate( (np.reshape(test_force_r, [-1, 3]), np.reshape(force_r, [-1, 3])), axis=1, ) pf_mag = np.concatenate( (np.reshape(test_force_m, [-1, 3]), np.reshape(force_m, [-1, 3])), axis=1, ) save_txt_file( detail_path.with_suffix(".fr.out"), pf_real, header="%s: data_fx data_fy data_fz pred_fx pred_fy pred_fz" % system, append=append_detail, ) save_txt_file( detail_path.with_suffix(".fm.out"), pf_mag, header="%s: data_fmx data_fmy data_fmz pred_fmx pred_fmy pred_fmz" % system, append=append_detail, ) pv = np.concatenate( ( np.reshape(test_data["virial"][:numb_test], [-1, 9]), np.reshape(virial, [-1, 9]), ), axis=1, ) save_txt_file( detail_path.with_suffix(".v.out"), pv, header=f"{system}: data_vxx data_vxy data_vxz data_vyx data_vyy " "data_vyz data_vzx data_vzy data_vzz pred_vxx pred_vxy pred_vxz pred_vyx " "pred_vyy pred_vyz pred_vzx pred_vzy pred_vzz", append=append_detail, ) pv_atom = pv / natoms save_txt_file( detail_path.with_suffix(".v_peratom.out"), pv_atom, header=f"{system}: data_vxx data_vxy data_vxz data_vyx data_vyy " "data_vyz data_vzx data_vzy data_vzz pred_vxx pred_vxy pred_vxz pred_vyx " "pred_vyy pred_vyz pred_vzx pred_vzy pred_vzz", append=append_detail, ) if not out_put_spin: return { "mae_e": (mae_e, energy.size), "mae_ea": (mae_ea, energy.size), "mae_f": (mae_f, force.size), "mae_v": (mae_v, virial.size), "mae_va": (mae_va, virial.size), "rmse_e": (rmse_e, energy.size), "rmse_ea": (rmse_ea, energy.size), "rmse_f": (rmse_f, force.size), "rmse_v": (rmse_v, virial.size), "rmse_va": (rmse_va, virial.size), } else: return { "mae_e": (mae_e, energy.size), "mae_ea": (mae_ea, energy.size), "mae_fr": (mae_fr, force_r.size), "mae_fm": (mae_fm, force_m.size), "mae_v": (mae_v, virial.size), "mae_va": (mae_va, virial.size), "rmse_e": (rmse_e, energy.size), "rmse_ea": (rmse_ea, energy.size), "rmse_fr": (rmse_fr, force_r.size), "rmse_fm": (rmse_fm, force_m.size), "rmse_v": (rmse_v, virial.size), "rmse_va": (rmse_va, virial.size), } def print_ener_sys_avg(avg: Dict[str, float]): """Print errors summary for energy type potential. Parameters ---------- avg : np.ndarray array with summaries """ log.info(f"Energy MAE : {avg['mae_e']:e} eV") log.info(f"Energy RMSE : {avg['rmse_e']:e} eV") log.info(f"Energy MAE/Natoms : {avg['mae_ea']:e} eV") log.info(f"Energy RMSE/Natoms : {avg['rmse_ea']:e} eV") if "rmse_f" in avg.keys(): log.info(f"Force MAE : {avg['mae_f']:e} eV/A") log.info(f"Force RMSE : {avg['rmse_f']:e} eV/A") else: log.info(f"Force atom MAE : {avg['mae_fr']:e} eV/A") log.info(f"Force spin MAE : {avg['mae_fm']:e} eV/uB") log.info(f"Force atom RMSE : {avg['rmse_fr']:e} eV/A") log.info(f"Force spin RMSE : {avg['rmse_fm']:e} eV/uB") log.info(f"Virial MAE : {avg['mae_v']:e} eV") log.info(f"Virial RMSE : {avg['rmse_v']:e} eV") log.info(f"Virial MAE/Natoms : {avg['mae_va']:e} eV") log.info(f"Virial RMSE/Natoms : {avg['rmse_va']:e} eV") def test_dos( dp: "DeepDOS", data: DeepmdData, system: str, numb_test: int, detail_file: Optional[str], has_atom_dos: bool, append_detail: bool = False, ) -> Tuple[List[np.ndarray], List[int]]: """Test DOS type model. Parameters ---------- dp : DeepDOS instance of deep potential data : DeepmdData data container object system : str system directory numb_test : int munber of tests to do detail_file : Optional[str] file where test details will be output has_atom_dos : bool whether per atom quantities should be computed append_detail : bool, optional if true append output detail file, by default False Returns ------- Tuple[List[np.ndarray], List[int]] arrays with results and their shapes """ data.add("dos", dp.numb_dos, atomic=False, must=True, high_prec=True) if has_atom_dos: data.add("atom_dos", dp.numb_dos, atomic=True, must=False, high_prec=True) if dp.get_dim_fparam() > 0: data.add( "fparam", dp.get_dim_fparam(), atomic=False, must=True, high_prec=False ) if dp.get_dim_aparam() > 0: data.add("aparam", dp.get_dim_aparam(), atomic=True, must=True, high_prec=False) test_data = data.get_test() mixed_type = data.mixed_type natoms = len(test_data["type"][0]) nframes = test_data["box"].shape[0] numb_test = min(nframes, numb_test) coord = test_data["coord"][:numb_test].reshape([numb_test, -1]) box = test_data["box"][:numb_test] if not data.pbc: box = None if mixed_type: atype = test_data["type"][:numb_test].reshape([numb_test, -1]) else: atype = test_data["type"][0] if dp.get_dim_fparam() > 0: fparam = test_data["fparam"][:numb_test] else: fparam = None if dp.get_dim_aparam() > 0: aparam = test_data["aparam"][:numb_test] else: aparam = None ret = dp.eval( coord, box, atype, fparam=fparam, aparam=aparam, atomic=has_atom_dos, mixed_type=mixed_type, ) dos = ret[0] dos = dos.reshape([numb_test, dp.numb_dos]) if has_atom_dos: ados = ret[1] ados = ados.reshape([numb_test, natoms * dp.numb_dos]) diff_dos = dos - test_data["dos"][:numb_test] mae_dos = mae(diff_dos) rmse_dos = rmse(diff_dos) mae_dosa = mae_dos / natoms rmse_dosa = rmse_dos / natoms if has_atom_dos: diff_ados = ados - test_data["atom_dos"][:numb_test] mae_ados = mae(diff_ados) rmse_ados = rmse(diff_ados) log.info(f"# number of test data : {numb_test:d} ") log.info(f"DOS MAE : {mae_dos:e} Occupation/eV") log.info(f"DOS RMSE : {rmse_dos:e} Occupation/eV") log.info(f"DOS MAE/Natoms : {mae_dosa:e} Occupation/eV") log.info(f"DOS RMSE/Natoms : {rmse_dosa:e} Occupation/eV") if has_atom_dos: log.info(f"Atomic DOS MAE : {mae_ados:e} Occupation/eV") log.info(f"Atomic DOS RMSE : {rmse_ados:e} Occupation/eV") if detail_file is not None: detail_path = Path(detail_file) for ii in range(numb_test): test_out = test_data["dos"][ii].reshape(-1, 1) pred_out = dos[ii].reshape(-1, 1) frame_output = np.hstack((test_out, pred_out)) save_txt_file( detail_path.with_suffix(".dos.out.%.d" % ii), frame_output, header="%s - %.d: data_dos pred_dos" % (system, ii), append=append_detail, ) if has_atom_dos: for ii in range(numb_test): test_out = test_data["atom_dos"][ii].reshape(-1, 1) pred_out = ados[ii].reshape(-1, 1) frame_output = np.hstack((test_out, pred_out)) save_txt_file( detail_path.with_suffix(".ados.out.%.d" % ii), frame_output, header="%s - %.d: data_ados pred_ados" % (system, ii), append=append_detail, ) return { "mae_dos": (mae_dos, dos.size), "mae_dosa": (mae_dosa, dos.size), "rmse_dos": (rmse_dos, dos.size), "rmse_dosa": (rmse_dosa, dos.size), } def print_dos_sys_avg(avg: Dict[str, float]): """Print errors summary for DOS type potential. Parameters ---------- avg : np.ndarray array with summaries """ log.info(f"DOS MAE : {avg['mae_dos']:e} Occupation/eV") log.info(f"DOS RMSE : {avg['rmse_dos']:e} Occupation/eV") log.info(f"DOS MAE/Natoms : {avg['mae_dosa']:e} Occupation/eV") log.info(f"DOS RMSE/Natoms : {avg['rmse_dosa']:e} Occupation/eV") def run_test(dp: "DeepTensor", test_data: dict, numb_test: int, test_sys: DeepmdData): """Run tests. Parameters ---------- dp : DeepTensor instance of deep potential test_data : dict dictionary with test data numb_test : int munber of tests to do test_sys : DeepmdData test system Returns ------- [type] [description] """ nframes = test_data["box"].shape[0] numb_test = min(nframes, numb_test) coord = test_data["coord"][:numb_test].reshape([numb_test, -1]) if test_sys.pbc: box = test_data["box"][:numb_test] else: box = None atype = test_data["type"][0] prediction = dp.eval(coord, box, atype) return prediction.reshape([numb_test, -1]), numb_test, atype def test_wfc( dp: "DeepWFC", data: DeepmdData, numb_test: int, detail_file: Optional[str], ) -> Tuple[List[np.ndarray], List[int]]: """Test energy type model. Parameters ---------- dp : DeepPot instance of deep potential data : DeepmdData data container object numb_test : int munber of tests to do detail_file : Optional[str] file where test details will be output Returns ------- Tuple[List[np.ndarray], List[int]] arrays with results and their shapes """ data.add( "wfc", 12, atomic=True, must=True, high_prec=False, type_sel=dp.get_sel_type() ) test_data = data.get_test() wfc, numb_test, _ = run_test(dp, test_data, numb_test, data) rmse_f = rmse(wfc - test_data["wfc"][:numb_test]) log.info("# number of test data : {numb_test:d} ") log.info("WFC RMSE : {rmse_f:e} eV/A") if detail_file is not None: detail_path = Path(detail_file) pe = np.concatenate( ( np.reshape(test_data["wfc"][:numb_test], [-1, 12]), np.reshape(wfc, [-1, 12]), ), axis=1, ) np.savetxt( detail_path.with_suffix(".out"), pe, header="ref_wfc(12 dofs) predicted_wfc(12 dofs)", ) return {"rmse": (rmse_f, wfc.size)} def print_wfc_sys_avg(avg): """Print errors summary for wfc type potential. Parameters ---------- avg : np.ndarray array with summaries """ log.info(f"WFC RMSE : {avg['rmse']:e} eV/A") def test_polar( dp: "DeepPolar", data: DeepmdData, numb_test: int, detail_file: Optional[str], *, atomic: bool, ) -> Tuple[List[np.ndarray], List[int]]: """Test energy type model. Parameters ---------- dp : DeepPot instance of deep potential data : DeepmdData data container object numb_test : int munber of tests to do detail_file : Optional[str] file where test details will be output atomic : bool wheter to use glovbal version of polar potential Returns ------- Tuple[List[np.ndarray], List[int]] arrays with results and their shapes """ data.add( "polarizability" if not atomic else "atomic_polarizability", 9, atomic=atomic, must=True, high_prec=False, type_sel=dp.get_sel_type(), ) test_data = data.get_test() polar, numb_test, atype = run_test(dp, test_data, numb_test, data) sel_type = dp.get_sel_type() sel_natoms = 0 for ii in sel_type: sel_natoms += sum(atype == ii) # YWolfeee: do summation in global polar mode if not atomic: polar = np.sum(polar.reshape((polar.shape[0], -1, 9)), axis=1) rmse_f = rmse(polar - test_data["polarizability"][:numb_test]) rmse_fs = rmse_f / np.sqrt(sel_natoms) rmse_fa = rmse_f / sel_natoms else: sel_mask = np.isin(atype, sel_type) polar = polar.reshape((polar.shape[0], -1, 9))[:, sel_mask, :].reshape( (polar.shape[0], -1) ) rmse_f = rmse(polar - test_data["atom_polarizability"][:numb_test]) log.info(f"# number of test data : {numb_test:d} ") log.info(f"Polarizability RMSE : {rmse_f:e}") if not atomic: log.info(f"Polarizability RMSE/sqrtN : {rmse_fs:e}") log.info(f"Polarizability RMSE/N : {rmse_fa:e}") log.info("The unit of error is the same as the unit of provided label.") if detail_file is not None: detail_path = Path(detail_file) if not atomic: pe = np.concatenate( ( np.reshape(test_data["polarizability"][:numb_test], [-1, 9]), np.reshape(polar, [-1, 9]), ), axis=1, ) header_text = ( "data_pxx data_pxy data_pxz data_pyx data_pyy data_pyz data_pzx " "data_pzy data_pzz pred_pxx pred_pxy pred_pxz pred_pyx pred_pyy " "pred_pyz pred_pzx pred_pzy pred_pzz" ) else: pe = np.concatenate( ( np.reshape( test_data["atom_polarizability"][:numb_test], [-1, 9 * sel_natoms], ), np.reshape(polar, [-1, 9 * sel_natoms]), ), axis=1, ) header_text = [ f"{letter}{number}" for number in range(1, sel_natoms + 1) for letter in [ "data_pxx", "data_pxy", "data_pxz", "data_pyx", "data_pyy", "data_pyz", "data_pzx", "data_pzy", "data_pzz", ] ] + [ f"{letter}{number}" for number in range(1, sel_natoms + 1) for letter in [ "pred_pxx", "pred_pxy", "pred_pxz", "pred_pyx", "pred_pyy", "pred_pyz", "pred_pzx", "pred_pzy", "pred_pzz", ] ] header_text = " ".join(header_text) np.savetxt( detail_path.with_suffix(".out"), pe, header=header_text, ) return {"rmse": (rmse_f, polar.size)} def print_polar_sys_avg(avg): """Print errors summary for polar type potential. Parameters ---------- avg : np.ndarray array with summaries """ log.info(f"Polarizability RMSE : {avg['rmse']:e} eV/A") def test_dipole( dp: "DeepDipole", data: DeepmdData, numb_test: int, detail_file: Optional[str], atomic: bool, ) -> Tuple[List[np.ndarray], List[int]]: """Test energy type model. Parameters ---------- dp : DeepPot instance of deep potential data : DeepmdData data container object numb_test : int munber of tests to do detail_file : Optional[str] file where test details will be output atomic : bool whether atomic dipole is provided Returns ------- Tuple[List[np.ndarray], List[int]] arrays with results and their shapes """ data.add( "dipole" if not atomic else "atomic_dipole", 3, atomic=atomic, must=True, high_prec=False, type_sel=dp.get_sel_type(), ) test_data = data.get_test() dipole, numb_test, atype = run_test(dp, test_data, numb_test, data) sel_type = dp.get_sel_type() sel_natoms = 0 for ii in sel_type: sel_natoms += sum(atype == ii) # do summation in atom dimension if not atomic: dipole = np.sum(dipole.reshape((dipole.shape[0], -1, 3)), axis=1) rmse_f = rmse(dipole - test_data["dipole"][:numb_test]) rmse_fs = rmse_f / np.sqrt(sel_natoms) rmse_fa = rmse_f / sel_natoms else: sel_mask = np.isin(atype, sel_type) dipole = dipole.reshape((dipole.shape[0], -1, 3))[:, sel_mask, :].reshape( (dipole.shape[0], -1) ) rmse_f = rmse(dipole - test_data["atom_dipole"][:numb_test]) log.info(f"# number of test data : {numb_test:d}") log.info(f"Dipole RMSE : {rmse_f:e}") if not atomic: log.info(f"Dipole RMSE/sqrtN : {rmse_fs:e}") log.info(f"Dipole RMSE/N : {rmse_fa:e}") log.info("The unit of error is the same as the unit of provided label.") if detail_file is not None: detail_path = Path(detail_file) if not atomic: pe = np.concatenate( ( np.reshape(test_data["dipole"][:numb_test], [-1, 3]), np.reshape(dipole, [-1, 3]), ), axis=1, ) header_text = "data_x data_y data_z pred_x pred_y pred_z" else: pe = np.concatenate( ( np.reshape( test_data["atom_dipole"][:numb_test], [-1, 3 * sel_natoms] ), np.reshape(dipole, [-1, 3 * sel_natoms]), ), axis=1, ) header_text = [ f"{letter}{number}" for number in range(1, sel_natoms + 1) for letter in ["data_x", "data_y", "data_z"] ] + [ f"{letter}{number}" for number in range(1, sel_natoms + 1) for letter in ["pred_x", "pred_y", "pred_z"] ] header_text = " ".join(header_text) np.savetxt( detail_path.with_suffix(".out"), pe, header=header_text, ) return {"rmse": (rmse_f, dipole.size)} def print_dipole_sys_avg(avg): """Print errors summary for dipole type potential. Parameters ---------- avg : np.ndarray array with summaries """ log.info(f"Dipole RMSE : {avg['rmse']:e} eV/A")