# SPDX-License-Identifier: LGPL-3.0-or-later
import copy
from typing import (
Dict,
List,
Optional,
Tuple,
)
import numpy as np
from deepmd.dpmodel.output_def import (
FittingOutputDef,
OutputVariableDef,
)
from deepmd.dpmodel.utils import (
AtomExcludeMask,
PairExcludeMask,
)
from .make_base_atomic_model import (
make_base_atomic_model,
)
[docs]
BaseAtomicModel_ = make_base_atomic_model(np.ndarray)
[docs]
class BaseAtomicModel(BaseAtomicModel_):
def __init__(
self,
type_map: List[str],
atom_exclude_types: List[int] = [],
pair_exclude_types: List[Tuple[int, int]] = [],
rcond: Optional[float] = None,
preset_out_bias: Optional[Dict[str, np.ndarray]] = None,
):
super().__init__()
self.type_map = type_map
self.reinit_atom_exclude(atom_exclude_types)
self.reinit_pair_exclude(pair_exclude_types)
self.rcond = rcond
self.preset_out_bias = preset_out_bias
[docs]
def init_out_stat(self):
"""Initialize the output bias."""
ntypes = self.get_ntypes()
self.bias_keys: List[str] = list(self.fitting_output_def().keys())
self.max_out_size = max(
[self.atomic_output_def()[kk].size for kk in self.bias_keys]
)
self.n_out = len(self.bias_keys)
out_bias_data = np.zeros([self.n_out, ntypes, self.max_out_size])
out_std_data = np.ones([self.n_out, ntypes, self.max_out_size])
self.out_bias = out_bias_data
self.out_std = out_std_data
[docs]
def __setitem__(self, key, value):
if key in ["out_bias"]:
self.out_bias = value
elif key in ["out_std"]:
self.out_std = value
else:
raise KeyError(key)
[docs]
def __getitem__(self, key):
if key in ["out_bias"]:
return self.out_bias
elif key in ["out_std"]:
return self.out_std
else:
raise KeyError(key)
[docs]
def get_type_map(self) -> List[str]:
"""Get the type map."""
return self.type_map
[docs]
def reinit_atom_exclude(
self,
exclude_types: List[int] = [],
):
self.atom_exclude_types = exclude_types
if exclude_types == []:
self.atom_excl = None
else:
self.atom_excl = AtomExcludeMask(self.get_ntypes(), self.atom_exclude_types)
[docs]
def reinit_pair_exclude(
self,
exclude_types: List[Tuple[int, int]] = [],
):
self.pair_exclude_types = exclude_types
if exclude_types == []:
self.pair_excl = None
else:
self.pair_excl = PairExcludeMask(self.get_ntypes(), self.pair_exclude_types)
[docs]
def atomic_output_def(self) -> FittingOutputDef:
old_def = self.fitting_output_def()
old_list = list(old_def.get_data().values())
return FittingOutputDef(
old_list # noqa:RUF005
+ [
OutputVariableDef(
name="mask",
shape=[1],
reduciable=False,
r_differentiable=False,
c_differentiable=False,
)
]
)
[docs]
def forward_common_atomic(
self,
extended_coord: np.ndarray,
extended_atype: np.ndarray,
nlist: np.ndarray,
mapping: Optional[np.ndarray] = None,
fparam: Optional[np.ndarray] = None,
aparam: Optional[np.ndarray] = None,
) -> Dict[str, np.ndarray]:
"""Common interface for atomic inference.
This method accept extended coordinates, extended atom typs, neighbor list,
and predict the atomic contribution of the fit property.
Parameters
----------
extended_coord
extended coodinates, shape: nf x (nall x 3)
extended_atype
extended atom typs, shape: nf x nall
for a type < 0 indicating the atomic is virtual.
nlist
neighbor list, shape: nf x nloc x nsel
mapping
extended to local index mapping, shape: nf x nall
fparam
frame parameters, shape: nf x dim_fparam
aparam
atomic parameter, shape: nf x nloc x dim_aparam
Returns
-------
ret_dict
dict of output atomic properties.
should implement the definition of `fitting_output_def`.
ret_dict["mask"] of shape nf x nloc will be provided.
ret_dict["mask"][ff,ii] == 1 indicating the ii-th atom of the ff-th frame is real.
ret_dict["mask"][ff,ii] == 0 indicating the ii-th atom of the ff-th frame is virtual.
"""
_, nloc, _ = nlist.shape
atype = extended_atype[:, :nloc]
if self.pair_excl is not None:
pair_mask = self.pair_excl.build_type_exclude_mask(nlist, extended_atype)
# exclude neighbors in the nlist
nlist = np.where(pair_mask == 1, nlist, -1)
ext_atom_mask = self.make_atom_mask(extended_atype)
ret_dict = self.forward_atomic(
extended_coord,
np.where(ext_atom_mask, extended_atype, 0),
nlist,
mapping=mapping,
fparam=fparam,
aparam=aparam,
)
ret_dict = self.apply_out_stat(ret_dict, atype)
# nf x nloc
atom_mask = ext_atom_mask[:, :nloc].astype(np.int32)
if self.atom_excl is not None:
atom_mask *= self.atom_excl.build_type_exclude_mask(atype)
for kk in ret_dict.keys():
out_shape = ret_dict[kk].shape
ret_dict[kk] = (
ret_dict[kk].reshape([out_shape[0], out_shape[1], -1])
* atom_mask[:, :, None]
).reshape(out_shape)
ret_dict["mask"] = atom_mask
return ret_dict
[docs]
def serialize(self) -> dict:
return {
"type_map": self.type_map,
"atom_exclude_types": self.atom_exclude_types,
"pair_exclude_types": self.pair_exclude_types,
"rcond": self.rcond,
"preset_out_bias": self.preset_out_bias,
"@variables": {
"out_bias": self.out_bias,
"out_std": self.out_std,
},
}
@classmethod
[docs]
def deserialize(cls, data: dict) -> "BaseAtomicModel":
data = copy.deepcopy(data)
variables = data.pop("@variables")
obj = cls(**data)
for kk in variables.keys():
obj[kk] = variables[kk]
return obj
[docs]
def apply_out_stat(
self,
ret: Dict[str, np.ndarray],
atype: np.ndarray,
):
"""Apply the stat to each atomic output.
The developer may override the method to define how the bias is applied
to the atomic output of the model.
Parameters
----------
ret
The returned dict by the forward_atomic method
atype
The atom types. nf x nloc
"""
out_bias, out_std = self._fetch_out_stat(self.bias_keys)
for kk in self.bias_keys:
# nf x nloc x odims, out_bias: ntypes x odims
ret[kk] = ret[kk] + out_bias[kk][atype]
return ret
[docs]
def _varsize(
self,
shape: List[int],
) -> int:
output_size = 1
len_shape = len(shape)
for i in range(len_shape):
output_size *= shape[i]
return output_size
[docs]
def _get_bias_index(
self,
kk: str,
) -> int:
res: List[int] = []
for i, e in enumerate(self.bias_keys):
if e == kk:
res.append(i)
assert len(res) == 1
return res[0]
[docs]
def _fetch_out_stat(
self,
keys: List[str],
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]:
ret_bias = {}
ret_std = {}
ntypes = self.get_ntypes()
for kk in keys:
idx = self._get_bias_index(kk)
isize = self._varsize(self.atomic_output_def()[kk].shape)
ret_bias[kk] = self.out_bias[idx, :, :isize].reshape(
[ntypes] + list(self.atomic_output_def()[kk].shape) # noqa: RUF005
)
ret_std[kk] = self.out_std[idx, :, :isize].reshape(
[ntypes] + list(self.atomic_output_def()[kk].shape) # noqa: RUF005
)
return ret_bias, ret_std