Source code for deepmd.dpmodel.atomic_model.base_atomic_model

# SPDX-License-Identifier: LGPL-3.0-or-later
import copy
from typing import (
    Dict,
    List,
    Optional,
    Tuple,
)

import numpy as np

from deepmd.dpmodel.output_def import (
    FittingOutputDef,
    OutputVariableDef,
)
from deepmd.dpmodel.utils import (
    AtomExcludeMask,
    PairExcludeMask,
)

from .make_base_atomic_model import (
    make_base_atomic_model,
)

[docs] BaseAtomicModel_ = make_base_atomic_model(np.ndarray)
[docs] class BaseAtomicModel(BaseAtomicModel_): def __init__( self, type_map: List[str], atom_exclude_types: List[int] = [], pair_exclude_types: List[Tuple[int, int]] = [], rcond: Optional[float] = None, preset_out_bias: Optional[Dict[str, np.ndarray]] = None, ): super().__init__() self.type_map = type_map self.reinit_atom_exclude(atom_exclude_types) self.reinit_pair_exclude(pair_exclude_types) self.rcond = rcond self.preset_out_bias = preset_out_bias
[docs] def init_out_stat(self): """Initialize the output bias.""" ntypes = self.get_ntypes() self.bias_keys: List[str] = list(self.fitting_output_def().keys()) self.max_out_size = max( [self.atomic_output_def()[kk].size for kk in self.bias_keys] ) self.n_out = len(self.bias_keys) out_bias_data = np.zeros([self.n_out, ntypes, self.max_out_size]) out_std_data = np.ones([self.n_out, ntypes, self.max_out_size]) self.out_bias = out_bias_data self.out_std = out_std_data
[docs] def __setitem__(self, key, value): if key in ["out_bias"]: self.out_bias = value elif key in ["out_std"]: self.out_std = value else: raise KeyError(key)
[docs] def __getitem__(self, key): if key in ["out_bias"]: return self.out_bias elif key in ["out_std"]: return self.out_std else: raise KeyError(key)
[docs] def get_type_map(self) -> List[str]: """Get the type map.""" return self.type_map
[docs] def reinit_atom_exclude( self, exclude_types: List[int] = [], ): self.atom_exclude_types = exclude_types if exclude_types == []: self.atom_excl = None else: self.atom_excl = AtomExcludeMask(self.get_ntypes(), self.atom_exclude_types)
[docs] def reinit_pair_exclude( self, exclude_types: List[Tuple[int, int]] = [], ): self.pair_exclude_types = exclude_types if exclude_types == []: self.pair_excl = None else: self.pair_excl = PairExcludeMask(self.get_ntypes(), self.pair_exclude_types)
[docs] def atomic_output_def(self) -> FittingOutputDef: old_def = self.fitting_output_def() old_list = list(old_def.get_data().values()) return FittingOutputDef( old_list # noqa:RUF005 + [ OutputVariableDef( name="mask", shape=[1], reduciable=False, r_differentiable=False, c_differentiable=False, ) ] )
[docs] def forward_common_atomic( self, extended_coord: np.ndarray, extended_atype: np.ndarray, nlist: np.ndarray, mapping: Optional[np.ndarray] = None, fparam: Optional[np.ndarray] = None, aparam: Optional[np.ndarray] = None, ) -> Dict[str, np.ndarray]: """Common interface for atomic inference. This method accept extended coordinates, extended atom typs, neighbor list, and predict the atomic contribution of the fit property. Parameters ---------- extended_coord extended coodinates, shape: nf x (nall x 3) extended_atype extended atom typs, shape: nf x nall for a type < 0 indicating the atomic is virtual. nlist neighbor list, shape: nf x nloc x nsel mapping extended to local index mapping, shape: nf x nall fparam frame parameters, shape: nf x dim_fparam aparam atomic parameter, shape: nf x nloc x dim_aparam Returns ------- ret_dict dict of output atomic properties. should implement the definition of `fitting_output_def`. ret_dict["mask"] of shape nf x nloc will be provided. ret_dict["mask"][ff,ii] == 1 indicating the ii-th atom of the ff-th frame is real. ret_dict["mask"][ff,ii] == 0 indicating the ii-th atom of the ff-th frame is virtual. """ _, nloc, _ = nlist.shape atype = extended_atype[:, :nloc] if self.pair_excl is not None: pair_mask = self.pair_excl.build_type_exclude_mask(nlist, extended_atype) # exclude neighbors in the nlist nlist = np.where(pair_mask == 1, nlist, -1) ext_atom_mask = self.make_atom_mask(extended_atype) ret_dict = self.forward_atomic( extended_coord, np.where(ext_atom_mask, extended_atype, 0), nlist, mapping=mapping, fparam=fparam, aparam=aparam, ) ret_dict = self.apply_out_stat(ret_dict, atype) # nf x nloc atom_mask = ext_atom_mask[:, :nloc].astype(np.int32) if self.atom_excl is not None: atom_mask *= self.atom_excl.build_type_exclude_mask(atype) for kk in ret_dict.keys(): out_shape = ret_dict[kk].shape ret_dict[kk] = ( ret_dict[kk].reshape([out_shape[0], out_shape[1], -1]) * atom_mask[:, :, None] ).reshape(out_shape) ret_dict["mask"] = atom_mask return ret_dict
[docs] def serialize(self) -> dict: return { "type_map": self.type_map, "atom_exclude_types": self.atom_exclude_types, "pair_exclude_types": self.pair_exclude_types, "rcond": self.rcond, "preset_out_bias": self.preset_out_bias, "@variables": { "out_bias": self.out_bias, "out_std": self.out_std, }, }
@classmethod
[docs] def deserialize(cls, data: dict) -> "BaseAtomicModel": data = copy.deepcopy(data) variables = data.pop("@variables") obj = cls(**data) for kk in variables.keys(): obj[kk] = variables[kk] return obj
[docs] def apply_out_stat( self, ret: Dict[str, np.ndarray], atype: np.ndarray, ): """Apply the stat to each atomic output. The developer may override the method to define how the bias is applied to the atomic output of the model. Parameters ---------- ret The returned dict by the forward_atomic method atype The atom types. nf x nloc """ out_bias, out_std = self._fetch_out_stat(self.bias_keys) for kk in self.bias_keys: # nf x nloc x odims, out_bias: ntypes x odims ret[kk] = ret[kk] + out_bias[kk][atype] return ret
[docs] def _varsize( self, shape: List[int], ) -> int: output_size = 1 len_shape = len(shape) for i in range(len_shape): output_size *= shape[i] return output_size
[docs] def _get_bias_index( self, kk: str, ) -> int: res: List[int] = [] for i, e in enumerate(self.bias_keys): if e == kk: res.append(i) assert len(res) == 1 return res[0]
[docs] def _fetch_out_stat( self, keys: List[str], ) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]: ret_bias = {} ret_std = {} ntypes = self.get_ntypes() for kk in keys: idx = self._get_bias_index(kk) isize = self._varsize(self.atomic_output_def()[kk].shape) ret_bias[kk] = self.out_bias[idx, :, :isize].reshape( [ntypes] + list(self.atomic_output_def()[kk].shape) # noqa: RUF005 ) ret_std[kk] = self.out_std[idx, :, :isize].reshape( [ntypes] + list(self.atomic_output_def()[kk].shape) # noqa: RUF005 ) return ret_bias, ret_std