# SPDX-License-Identifier: LGPL-3.0-or-later
from functools import (
lru_cache,
)
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Union,
)
import numpy as np
from deepmd.common import (
make_default_mesh,
)
from deepmd.dpmodel.output_def import (
ModelOutputDef,
OutputVariableCategory,
)
from deepmd.infer.deep_dipole import (
DeepDipole,
)
from deepmd.infer.deep_dos import (
DeepDOS,
)
from deepmd.infer.deep_eval import (
DeepEvalBackend,
)
from deepmd.infer.deep_polar import (
DeepGlobalPolar,
DeepPolar,
)
from deepmd.infer.deep_pot import (
DeepPot,
)
from deepmd.infer.deep_wfc import (
DeepWFC,
)
from deepmd.tf.env import (
MODEL_VERSION,
default_tf_session_config,
tf,
)
from deepmd.tf.utils.batch_size import (
AutoBatchSize,
)
from deepmd.tf.utils.sess import (
run_sess,
)
if TYPE_CHECKING:
from pathlib import (
Path,
)
from deepmd.infer.deep_eval import DeepEval as DeepEvalWrapper
[docs]
class DeepEval(DeepEvalBackend):
"""TensorFlow backend implementation for DeepEval.
Parameters
----------
model_file : Path
The name of the frozen model file.
output_def : ModelOutputDef
The output definition of the model.
*args : list
Positional arguments.
load_prefix: str
The prefix in the load computational graph
default_tf_graph : bool
If uses the default tf graph, otherwise build a new tf graph for evaluation
auto_batch_size : bool or int or AutomaticBatchSize, default: False
If True, automatic batch size will be used. If int, it will be used
as the initial batch size.
input_map : dict, optional
The input map for tf.import_graph_def. Only work with default tf graph
neighbor_list : ase.neighborlist.NewPrimitiveNeighborList, optional
The ASE neighbor list class to produce the neighbor list. If None, the
neighbor list will be built natively in the model.
**kwargs : dict
Keyword arguments.
"""
def __init__(
self,
model_file: "Path",
output_def: ModelOutputDef,
*args: list,
load_prefix: str = "load",
default_tf_graph: bool = False,
auto_batch_size: Union[bool, int, AutoBatchSize] = False,
input_map: Optional[dict] = None,
neighbor_list=None,
**kwargs: dict,
):
self.graph = self._load_graph(
model_file,
prefix=load_prefix,
default_tf_graph=default_tf_graph,
input_map=input_map,
)
self.load_prefix = load_prefix
# graph_compatable should be called after graph and prefix are set
if not self._graph_compatable():
raise RuntimeError(
f"model in graph (version {self.model_version}) is incompatible"
f"with the model (version {MODEL_VERSION}) supported by the current code."
"See https://deepmd.rtfd.io/compatability/ for details."
)
# set default to False, as subclasses may not support
if isinstance(auto_batch_size, bool):
if auto_batch_size:
self.auto_batch_size = AutoBatchSize()
else:
self.auto_batch_size = None
elif isinstance(auto_batch_size, int):
self.auto_batch_size = AutoBatchSize(auto_batch_size)
elif isinstance(auto_batch_size, AutoBatchSize):
self.auto_batch_size = auto_batch_size
else:
raise TypeError("auto_batch_size should be bool, int, or AutoBatchSize")
self.neighbor_list = neighbor_list
self.output_def = output_def
self._init_tensors()
self._init_attr()
self.has_efield = self.tensors["efield"] is not None
self.has_fparam = self.tensors["fparam"] is not None
self.has_aparam = self.tensors["aparam"] is not None
self.has_spin = self.ntypes_spin > 0
self.modifier_type = None
# looks ugly...
if self.modifier_type == "dipole_charge":
from deepmd.tf.infer.data_modifier import (
DipoleChargeModifier,
)
t_mdl_name = self._get_tensor("modifier_attr/mdl_name:0")
t_mdl_charge_map = self._get_tensor("modifier_attr/mdl_charge_map:0")
t_sys_charge_map = self._get_tensor("modifier_attr/sys_charge_map:0")
t_ewald_h = self._get_tensor("modifier_attr/ewald_h:0")
t_ewald_beta = self._get_tensor("modifier_attr/ewald_beta:0")
[mdl_name, mdl_charge_map, sys_charge_map, ewald_h, ewald_beta] = run_sess(
self.sess,
[
t_mdl_name,
t_mdl_charge_map,
t_sys_charge_map,
t_ewald_h,
t_ewald_beta,
],
)
mdl_name = mdl_name.decode("UTF-8")
mdl_charge_map = [int(ii) for ii in mdl_charge_map.decode("UTF-8").split()]
sys_charge_map = [int(ii) for ii in sys_charge_map.decode("UTF-8").split()]
self.dm = DipoleChargeModifier(
mdl_name,
mdl_charge_map,
sys_charge_map,
ewald_h=ewald_h,
ewald_beta=ewald_beta,
)
[docs]
def _init_tensors(self):
tensor_names = {
# descrpt attrs
"ntypes": "descrpt_attr/ntypes:0",
"rcut": "descrpt_attr/rcut:0",
# model attrs
"tmap": "model_attr/tmap:0",
# inputs
"coord": "t_coord:0",
"type": "t_type:0",
"natoms": "t_natoms:0",
"box": "t_box:0",
"mesh": "t_mesh:0",
}
optional_tensor_names = {
# fitting attrs
"dfparam": "fitting_attr/dfparam:0",
"daparam": "fitting_attr/daparam:0",
"numb_dos": "fitting_attr/numb_dos:0",
# model attrs
"sel_type": "model_attr/sel_type:0",
# additonal inputs
"efield": "t_efield:0",
"fparam": "t_fparam:0",
"aparam": "t_aparam:0",
"ntypes_spin": "spin_attr/ntypes_spin:0",
# descriptor
"descriptor": "o_descriptor:0",
}
# output tensors
output_tensor_names = {}
for vv in self.output_def.var_defs:
output_tensor_names[vv] = f"o_{self._OUTDEF_DP2BACKEND[vv]}:0"
self.tensors = {}
for tensor_key, tensor_name in tensor_names.items():
self.tensors[tensor_key] = self._get_tensor(tensor_name)
for tensor_key, tensor_name in optional_tensor_names.items():
try:
self.tensors[tensor_key] = self._get_tensor(tensor_name)
except KeyError:
self.tensors[tensor_key] = None
self.output_tensors = {}
removed_defs = []
for ii, (tensor_key, tensor_name) in enumerate(output_tensor_names.items()):
try:
self.output_tensors[tensor_key] = self._get_tensor(tensor_name)
except KeyError:
# do not output
removed_defs.append(ii)
for ii in sorted(removed_defs, reverse=True):
del self.output_def.var_defs[list(self.output_def.var_defs.keys())[ii]]
[docs]
def _init_attr(self):
[
self.ntypes,
self.rcut,
tmap,
] = run_sess(
self.sess,
[
self.tensors["ntypes"],
self.tensors["rcut"],
self.tensors["tmap"],
],
)
if self.tensors["ntypes_spin"] is not None:
self.ntypes_spin = run_sess(self.sess, [self.tensors["ntypes_spin"]])[0]
else:
self.ntypes_spin = 0
if self.tensors["dfparam"] is not None:
self.dfparam = run_sess(self.sess, [self.tensors["dfparam"]])[0]
else:
self.dfparam = 0
if self.tensors["daparam"] is not None:
self.daparam = run_sess(self.sess, [self.tensors["daparam"]])[0]
else:
self.daparam = 0
if self.tensors["sel_type"] is not None:
self.sel_type = run_sess(self.sess, [self.tensors["sel_type"]])[0]
else:
self.sel_type = None
if self.tensors["numb_dos"] is not None:
self.numb_dos = run_sess(self.sess, [self.tensors["numb_dos"]])[0]
else:
self.numb_dos = 0
self.tmap = tmap.decode("utf-8").split()
@property
@lru_cache(maxsize=None)
[docs]
def model_type(self) -> "DeepEvalWrapper":
"""Get type of model.
:type:str
"""
t_mt = self._get_tensor("model_attr/model_type:0")
[mt] = run_sess(self.sess, [t_mt], feed_dict={})
model_type = mt.decode("utf-8")
if model_type == "ener":
return DeepPot
elif model_type == "dos":
return DeepDOS
elif model_type == "dipole":
return DeepDipole
elif model_type == "polar":
return DeepPolar
elif model_type == "global_polar":
return DeepGlobalPolar
elif model_type == "wfc":
return DeepWFC
else:
raise RuntimeError(f"unknown model type {model_type}")
@property
@lru_cache(maxsize=None)
[docs]
def model_version(self) -> str:
"""Get version of model.
Returns
-------
str
version of model
"""
try:
t_mt = self._get_tensor("model_attr/model_version:0")
except KeyError:
# For deepmd-kit version 0.x - 1.x, set model version to 0.0
return "0.0"
else:
[mt] = run_sess(self.sess, [t_mt], feed_dict={})
return mt.decode("utf-8")
@property
@lru_cache(maxsize=None)
[docs]
def sess(self) -> tf.Session:
"""Get TF session."""
# start a tf session associated to the graph
return tf.Session(graph=self.graph, config=default_tf_session_config)
[docs]
def _graph_compatable(self) -> bool:
"""Check the model compatability.
Returns
-------
bool
If the model stored in the graph file is compatable with the current code
"""
model_version_major = int(self.model_version.split(".")[0])
model_version_minor = int(self.model_version.split(".")[1])
MODEL_VERSION_MAJOR = int(MODEL_VERSION.split(".")[0])
MODEL_VERSION_MINOR = int(MODEL_VERSION.split(".")[1])
if (model_version_major != MODEL_VERSION_MAJOR) or (
model_version_minor > MODEL_VERSION_MINOR
):
return False
else:
return True
[docs]
def _get_tensor(
self,
tensor_name: str,
) -> tf.Tensor:
"""Get TF graph tensor.
Parameters
----------
tensor_name : str
name of tensor to get
Returns
-------
tf.Tensor
loaded tensor
"""
# do not use os.path.join as it doesn't work on Windows
tensor_path = "/".join((self.load_prefix, tensor_name))
tensor = self.graph.get_tensor_by_name(tensor_path)
return tensor
@staticmethod
[docs]
def _load_graph(
frozen_graph_filename: "Path",
prefix: str = "load",
default_tf_graph: bool = False,
input_map: Optional[dict] = None,
):
# We load the protobuf file from the disk and parse it to retrieve the
# unserialized graph_def
with tf.gfile.GFile(str(frozen_graph_filename), "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
if default_tf_graph:
tf.import_graph_def(
graph_def,
input_map=input_map,
return_elements=None,
name=prefix,
producer_op_list=None,
)
graph = tf.get_default_graph()
else:
# Then, we can use again a convenient built-in function to import
# a graph_def into the current default Graph
with tf.Graph().as_default() as graph:
tf.import_graph_def(
graph_def,
input_map=None,
return_elements=None,
name=prefix,
producer_op_list=None,
)
return graph
@staticmethod
@staticmethod
[docs]
def reverse_map(vec: np.ndarray, imap: List[int]) -> np.ndarray:
"""Reverse mapping of a vector according to the index map.
Parameters
----------
vec
Input vector. Be of shape [nframes, natoms, -1]
imap
Index map. Be of shape [natoms]
Returns
-------
vec_out
Reverse mapped vector.
"""
ret = np.zeros(vec.shape)
ret[:, imap, :] = vec
return ret
[docs]
def make_natoms_vec(
self,
atom_types: np.ndarray,
) -> np.ndarray:
"""Make the natom vector used by deepmd-kit.
Parameters
----------
atom_types
The type of atoms
Returns
-------
natoms
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
"""
natoms_vec = np.zeros(self.ntypes + 2).astype(int)
natoms = atom_types[0].size
natoms_vec[0] = natoms
natoms_vec[1] = natoms
for ii in range(self.ntypes):
natoms_vec[ii + 2] = np.count_nonzero(atom_types[0] == ii)
if np.count_nonzero(atom_types[0] == -1) > 0:
# contains virtual atoms
# energy fitting sums over natoms_vec[2:] instead of reading from natoms_vec[0]
# causing errors for shape mismatch
natoms_vec[2] += np.count_nonzero(atom_types[0] == -1)
return natoms_vec
[docs]
def eval_typeebd(self) -> np.ndarray:
"""Evaluate output of type embedding network by using this model.
Returns
-------
np.ndarray
The output of type embedding network. The shape is [ntypes, o_size],
where ntypes is the number of types, and o_size is the number of nodes
in the output layer.
Raises
------
KeyError
If the model does not enable type embedding.
See Also
--------
deepmd.tf.utils.type_embed.TypeEmbedNet : The type embedding network.
Examples
--------
Get the output of type embedding network of `graph.pb`:
>>> from deepmd.tf.infer import DeepPotential
>>> dp = DeepPotential("graph.pb")
>>> dp.eval_typeebd()
"""
t_typeebd = self._get_tensor("t_typeebd:0")
[typeebd] = run_sess(self.sess, [t_typeebd], feed_dict={})
return typeebd
[docs]
def build_neighbor_list(
self,
coords: np.ndarray,
cell: Optional[np.ndarray],
atype: np.ndarray,
imap: np.ndarray,
neighbor_list,
):
"""Make the mesh with neighbor list for a single frame.
Parameters
----------
coords : np.ndarray
The coordinates of atoms. Should be of shape [natoms, 3]
cell : Optional[np.ndarray]
The cell of the system. Should be of shape [3, 3]
atype : np.ndarray
The type of atoms. Should be of shape [natoms]
imap : np.ndarray
The index map of atoms. Should be of shape [natoms]
neighbor_list : ase.neighborlist.NewPrimitiveNeighborList
ASE neighbor list. The following method or attribute will be
used/set: bothways, self_interaction, update, build, first_neigh,
pair_second, offset_vec.
Returns
-------
natoms_vec : np.ndarray
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: nloc
natoms[1]: nall
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms for nloc
coords : np.ndarray
The coordinates of atoms, including ghost atoms. Should be of
shape [nframes, nall, 3]
atype : np.ndarray
The type of atoms, including ghost atoms. Should be of shape [nall]
mesh : np.ndarray
The mesh in nei_mode=4.
imap : np.ndarray
The index map of atoms. Should be of shape [nall]
ghost_map : np.ndarray
The index map of ghost atoms. Should be of shape [nghost]
"""
pbc = np.repeat(cell is not None, 3)
cell = cell.reshape(3, 3)
positions = coords.reshape(-1, 3)
neighbor_list.bothways = True
neighbor_list.self_interaction = False
if neighbor_list.update(pbc, cell, positions):
neighbor_list.build(pbc, cell, positions)
first_neigh = neighbor_list.first_neigh.copy()
pair_second = neighbor_list.pair_second.copy()
offset_vec = neighbor_list.offset_vec.copy()
# get out-of-box neighbors
out_mask = np.any(offset_vec != 0, axis=1)
out_idx = pair_second[out_mask]
out_offset = offset_vec[out_mask]
out_coords = positions[out_idx] + out_offset.dot(cell)
atype = np.array(atype, dtype=int).reshape(-1)
out_atype = atype[out_idx]
nloc = positions.shape[0]
nghost = out_idx.size
all_coords = np.concatenate((positions, out_coords), axis=0)
all_atype = np.concatenate((atype, out_atype), axis=0)
# convert neighbor indexes
ghost_map = pair_second[out_mask]
pair_second[out_mask] = np.arange(nloc, nloc + nghost)
# get the mesh
mesh = np.zeros(16 + nloc * 2 + pair_second.size, dtype=int)
mesh[0] = nloc
# ilist
mesh[16 : 16 + nloc] = np.arange(nloc)
# numnei
mesh[16 + nloc : 16 + nloc * 2] = first_neigh[1:] - first_neigh[:-1]
# jlist
mesh[16 + nloc * 2 :] = pair_second
# natoms_vec
natoms_vec = np.zeros(self.ntypes + 2).astype(int)
natoms_vec[0] = nloc
natoms_vec[1] = nloc + nghost
for ii in range(self.ntypes):
natoms_vec[ii + 2] = np.count_nonzero(atype == ii)
# imap append ghost atoms
imap = np.concatenate((imap, np.arange(nloc, nloc + nghost)))
return natoms_vec, all_coords, all_atype, mesh, imap, ghost_map
[docs]
def get_ntypes(self) -> int:
"""Get the number of atom types of this model."""
return self.ntypes
[docs]
def get_ntypes_spin(self) -> int:
"""Get the number of spin atom types of this model."""
return self.ntypes_spin
[docs]
def get_rcut(self) -> float:
"""Get the cut-off radius of this model."""
return self.rcut
[docs]
def get_type_map(self) -> List[str]:
"""Get the type map (element name of the atom types) of this model."""
return self.tmap
[docs]
def get_sel_type(self) -> Optional[np.ndarray]:
"""Get the selected atom types of this model.
Only atoms with selected atom types have atomic contribution
to the result of the model.
If returning an empty list, all atom types are selected.
"""
return np.array(self.sel_type).ravel()
[docs]
def get_dim_fparam(self) -> int:
"""Get the number (dimension) of frame parameters of this DP."""
return self.dfparam
[docs]
def get_dim_aparam(self) -> int:
"""Get the number (dimension) of atomic parameters of this DP."""
return self.daparam
[docs]
def _eval_func(self, inner_func: Callable, numb_test: int, natoms: int) -> Callable:
"""Wrapper method with auto batch size.
Parameters
----------
inner_func : Callable
the method to be wrapped
numb_test : int
number of tests
natoms : int
number of atoms
Returns
-------
Callable
the wrapper
"""
if self.auto_batch_size is not None:
def eval_func(*args, **kwargs):
return self.auto_batch_size.execute_all(
inner_func, numb_test, natoms, *args, **kwargs
)
else:
eval_func = inner_func
return eval_func
[docs]
def _get_natoms_and_nframes(
self,
coords: np.ndarray,
atom_types: Union[List[int], np.ndarray],
) -> Tuple[int, int]:
natoms = len(atom_types[0])
if natoms == 0:
assert coords.size == 0
else:
coords = np.reshape(np.array(coords), [-1, natoms * 3])
nframes = coords.shape[0]
return natoms, nframes
[docs]
def eval(
self,
coords: np.ndarray,
cells: np.ndarray,
atom_types: np.ndarray,
atomic: bool = False,
fparam: Optional[np.ndarray] = None,
aparam: Optional[np.ndarray] = None,
efield: Optional[np.ndarray] = None,
**kwargs: Dict[str, Any],
) -> Dict[str, np.ndarray]:
"""Evaluate the energy, force and virial by using this DP.
Parameters
----------
coords
The coordinates of atoms.
The array should be of size nframes x natoms x 3
cells
The cell of the region.
If None then non-PBC is assumed, otherwise using PBC.
The array should be of size nframes x 9
atom_types
The atom types
The list should contain natoms ints
atomic
Calculate the atomic energy and virial
fparam
The frame parameter.
The array can be of size :
- nframes x dim_fparam.
- dim_fparam. Then all frames are assumed to be provided with the same fparam.
aparam
The atomic parameter
The array can be of size :
- nframes x natoms x dim_aparam.
- natoms x dim_aparam. Then all frames are assumed to be provided with the same aparam.
- dim_aparam. Then all frames and atoms are provided with the same aparam.
efield
The external field on atoms.
The array should be of size nframes x natoms x 3
**kwargs
Other parameters
Returns
-------
output_dict : dict
The output of the evaluation. The keys are the names of the output
variables, and the values are the corresponding output arrays.
"""
# reshape coords before getting shape
natoms, numb_test = self._get_natoms_and_nframes(
coords,
atom_types,
)
output = self._eval_func(self._eval_inner, numb_test, natoms)(
coords,
cells,
atom_types,
fparam=fparam,
aparam=aparam,
atomic=atomic,
efield=efield,
)
if not isinstance(output, tuple):
output = (output,)
output_dict = {
odef.name: oo for oo, odef in zip(output, self.output_def.var_defs.values())
}
# ugly!!
if self.modifier_type is not None and isinstance(self.model_type, DeepPot):
if atomic:
raise RuntimeError("modifier does not support atomic modification")
me, mf, mv = self.dm.eval(coords, cells, atom_types)
output = list(output) # tuple to list
e, f, v = output[:3]
output_dict["energy_redu"] += me.reshape(e.shape)
output_dict["energy_deri_r"] += mf.reshape(f.shape)
output_dict["energy_deri_c_redu"] += mv.reshape(v.shape)
return output_dict
[docs]
def _prepare_feed_dict(
self,
coords,
cells,
atom_types,
fparam=None,
aparam=None,
efield=None,
):
# standarize the shape of inputs
natoms, nframes = self._get_natoms_and_nframes(
coords,
atom_types,
)
atom_types = np.array(atom_types, dtype=int).reshape([nframes, natoms])
coords = np.reshape(np.array(coords), [nframes, natoms * 3])
if cells is None:
pbc = False
# make cells to work around the requirement of pbc
cells = np.tile(np.eye(3), [nframes, 1]).reshape([nframes, 9])
else:
pbc = True
cells = np.array(cells).reshape([nframes, 9])
if self.has_fparam:
assert fparam is not None
fparam = np.array(fparam)
if self.has_aparam:
assert aparam is not None
aparam = np.array(aparam)
if self.has_efield:
assert (
efield is not None
), "you are using a model with external field, parameter efield should be provided"
efield = np.array(efield)
# reshape the inputs
if self.has_fparam:
fdim = self.get_dim_fparam()
if fparam.size == nframes * fdim:
fparam = np.reshape(fparam, [nframes, fdim])
elif fparam.size == fdim:
fparam = np.tile(fparam.reshape([-1]), [nframes, 1])
else:
raise RuntimeError(
"got wrong size of frame param, should be either %d x %d or %d"
% (nframes, fdim, fdim)
)
if self.has_aparam:
fdim = self.get_dim_aparam()
if aparam.size == nframes * natoms * fdim:
aparam = np.reshape(aparam, [nframes, natoms * fdim])
elif aparam.size == natoms * fdim:
aparam = np.tile(aparam.reshape([-1]), [nframes, 1])
elif aparam.size == fdim:
aparam = np.tile(aparam.reshape([-1]), [nframes, natoms])
else:
raise RuntimeError(
"got wrong size of frame param, should be either %d x %d x %d or %d x %d or %d"
% (nframes, natoms, fdim, natoms, fdim, fdim)
)
# sort inputs
coords, atom_types, imap, sel_at, sel_imap = self.sort_input(
coords, atom_types, sel_atoms=self.get_sel_type()
)
if self.has_efield:
efield = np.reshape(efield, [nframes, natoms, 3])
efield = efield[:, imap, :]
efield = np.reshape(efield, [nframes, natoms * 3])
if self.has_aparam:
aparam = np.reshape(aparam, [nframes, natoms, fdim])
aparam = aparam[:, imap, :]
aparam = np.reshape(aparam, [nframes, natoms * fdim])
# make natoms_vec and default_mesh
if self.neighbor_list is None:
natoms_vec = self.make_natoms_vec(atom_types)
assert natoms_vec[0] == natoms
mesh = make_default_mesh(pbc, not self._check_mixed_types(atom_types))
ghost_map = None
else:
if nframes > 1:
raise NotImplementedError(
"neighbor_list does not support multiple frames"
)
(
natoms_vec,
coords,
atom_types,
mesh,
imap,
ghost_map,
) = self.build_neighbor_list(
coords,
cells if cells is not None else None,
atom_types,
imap,
self.neighbor_list,
)
# evaluate
feed_dict_test = {}
feed_dict_test[self.tensors["natoms"]] = natoms_vec
feed_dict_test[self.tensors["type"]] = atom_types.reshape([-1])
feed_dict_test[self.tensors["coord"]] = np.reshape(coords, [-1])
if len(self.tensors["box"].shape) == 1:
feed_dict_test[self.tensors["box"]] = np.reshape(cells, [-1])
elif len(self.tensors["box"].shape) == 2:
feed_dict_test[self.tensors["box"]] = cells
else:
raise RuntimeError
if self.has_efield:
feed_dict_test[self.tensors["efield"]] = np.reshape(efield, [-1])
feed_dict_test[self.tensors["mesh"]] = mesh
if self.has_fparam:
feed_dict_test[self.tensors["fparam"]] = np.reshape(fparam, [-1])
if self.has_aparam:
feed_dict_test[self.tensors["aparam"]] = np.reshape(aparam, [-1])
return feed_dict_test, imap, natoms_vec, ghost_map, sel_at, sel_imap
[docs]
def _eval_inner(
self,
coords,
cells,
atom_types,
fparam=None,
aparam=None,
efield=None,
**kwargs,
):
natoms, nframes = self._get_natoms_and_nframes(
coords,
atom_types,
)
(
feed_dict_test,
imap,
natoms_vec,
ghost_map,
sel_at,
sel_imap,
) = self._prepare_feed_dict(
coords,
cells,
atom_types,
fparam,
aparam,
efield,
)
nloc = natoms_vec[0]
nloc_sel = sel_at.shape[1]
nall = natoms_vec[1]
t_out = list(self.output_tensors.values())
v_out = run_sess(self.sess, t_out, feed_dict=feed_dict_test)
if nloc_sel == 0:
nloc_sel = nloc
sel_imap = imap
if self.has_spin:
ntypes_real = self.ntypes - self.ntypes_spin
natoms_real = sum(
[
np.count_nonzero(np.array(atom_types[0]) == ii)
for ii in range(ntypes_real)
]
)
else:
natoms_real = nloc_sel
if ghost_map is not None:
# add the value of ghost atoms to real atoms
for ii, odef in enumerate(self.output_def.var_defs.values()):
# when the shape is nall
if odef.category in (
OutputVariableCategory.DERV_R,
OutputVariableCategory.DERV_C,
):
odef_shape = self._get_output_shape(odef, nframes, nall)
tmp_shape = [np.prod(odef_shape[:-2]), *odef_shape[-2:]]
v_out[ii] = np.reshape(v_out[ii], tmp_shape)
for jj in range(v_out[ii].shape[0]):
np.add.at(v_out[ii][jj], ghost_map, v_out[ii][jj, nloc:])
for ii, odef in enumerate(self.output_def.var_defs.values()):
if odef.category in (
OutputVariableCategory.DERV_R,
OutputVariableCategory.DERV_C,
):
odef_shape = self._get_output_shape(odef, nframes, nall)
tmp_shape = [np.prod(odef_shape[:-2]), *odef_shape[-2:]]
# reverse map of the outputs
v_out[ii] = self.reverse_map(np.reshape(v_out[ii], tmp_shape), imap)
v_out[ii] = np.reshape(v_out[ii], odef_shape)
if nloc < nall:
v_out[ii] = v_out[ii][:, :, :nloc]
elif odef.category == OutputVariableCategory.OUT:
odef_shape = self._get_output_shape(odef, nframes, natoms_real)
v_out[ii] = self.reverse_map(
np.reshape(v_out[ii], odef_shape), sel_imap[:natoms_real]
)
if nloc_sel < nloc:
# convert shape from nsel to nloc
# sel_atoms was applied before sort; see sort_input
# do not consider mixed_types here (as it is never supported)
sel_mask = np.isin(atom_types[0], self.sel_type)
out_nsel = v_out[ii]
out_nloc = np.zeros(
(nframes, nloc, *out_nsel.shape[2:]), dtype=out_nsel.dtype
)
out_nloc[:, sel_mask] = out_nsel
v_out[ii] = out_nloc
odef_shape = self._get_output_shape(odef, nframes, nloc)
v_out[ii] = np.reshape(v_out[ii], odef_shape)
elif odef.category in (
OutputVariableCategory.REDU,
OutputVariableCategory.DERV_C_REDU,
):
odef_shape = self._get_output_shape(odef, nframes, 0)
v_out[ii] = np.reshape(v_out[ii], odef_shape)
else:
raise RuntimeError("unknown category")
return tuple(v_out)
[docs]
def _get_output_shape(self, odef, nframes, natoms):
if odef.category == OutputVariableCategory.DERV_C_REDU:
# virial
return [nframes, *odef.shape[:-1], 9]
elif odef.category == OutputVariableCategory.REDU:
# energy
return [nframes, *odef.shape, 1]
elif odef.category == OutputVariableCategory.DERV_C:
# atom_virial
return [nframes, *odef.shape[:-1], natoms, 9]
elif odef.category == OutputVariableCategory.DERV_R:
# force
return [nframes, *odef.shape[:-1], natoms, 3]
elif odef.category == OutputVariableCategory.OUT:
# atom_energy, atom_tensor
# Something wrong here?
# return [nframes, *shape, natoms, 1]
return [nframes, natoms, *odef.shape, 1]
else:
raise RuntimeError("unknown category")
[docs]
def eval_descriptor(
self,
coords: np.ndarray,
cells: np.ndarray,
atom_types: np.ndarray,
fparam: Optional[np.ndarray] = None,
aparam: Optional[np.ndarray] = None,
efield: Optional[np.ndarray] = None,
) -> np.ndarray:
"""Evaluate descriptors by using this DP.
Parameters
----------
coords
The coordinates of atoms.
The array should be of size nframes x natoms x 3
cells
The cell of the region.
If None then non-PBC is assumed, otherwise using PBC.
The array should be of size nframes x 9
atom_types
The atom types
The list should contain natoms ints
fparam
The frame parameter.
The array can be of size :
- nframes x dim_fparam.
- dim_fparam. Then all frames are assumed to be provided with the same fparam.
aparam
The atomic parameter
The array can be of size :
- nframes x natoms x dim_aparam.
- natoms x dim_aparam. Then all frames are assumed to be provided with the same aparam.
- dim_aparam. Then all frames and atoms are provided with the same aparam.
efield
The external field on atoms.
The array should be of size nframes x natoms x 3
Returns
-------
descriptor
Descriptors.
"""
natoms, numb_test = self._get_natoms_and_nframes(
coords,
atom_types,
)
descriptor = self._eval_func(self._eval_descriptor_inner, numb_test, natoms)(
coords,
cells,
atom_types,
fparam=fparam,
aparam=aparam,
efield=efield,
)
return descriptor
[docs]
def _eval_descriptor_inner(
self,
coords: np.ndarray,
cells: np.ndarray,
atom_types: np.ndarray,
fparam: Optional[np.ndarray] = None,
aparam: Optional[np.ndarray] = None,
efield: Optional[np.ndarray] = None,
) -> np.ndarray:
natoms, nframes = self._get_natoms_and_nframes(
coords,
atom_types,
)
(
feed_dict_test,
imap,
natoms_vec,
ghost_map,
sel_at,
sel_imap,
) = self._prepare_feed_dict(
coords,
cells,
atom_types,
fparam,
aparam,
efield,
)
(descriptor,) = run_sess(
self.sess, [self.tensors["descriptor"]], feed_dict=feed_dict_test
)
imap = imap[:natoms]
return self.reverse_map(np.reshape(descriptor, [nframes, natoms, -1]), imap)
[docs]
def get_numb_dos(self) -> int:
return self.numb_dos
[docs]
def get_has_efield(self) -> bool:
return self.has_efield
[docs]
class DeepEvalOld:
# old class for DipoleChargeModifier only
"""Common methods for DeepPot, DeepWFC, DeepPolar, ...
Parameters
----------
model_file : Path
The name of the frozen model file.
load_prefix: str
The prefix in the load computational graph
default_tf_graph : bool
If uses the default tf graph, otherwise build a new tf graph for evaluation
auto_batch_size : bool or int or AutomaticBatchSize, default: False
If True, automatic batch size will be used. If int, it will be used
as the initial batch size.
input_map : dict, optional
The input map for tf.import_graph_def. Only work with default tf graph
neighbor_list : ase.neighborlist.NewPrimitiveNeighborList, optional
The ASE neighbor list class to produce the neighbor list. If None, the
neighbor list will be built natively in the model.
"""
[docs]
load_prefix: str # set by subclass
def __init__(
self,
model_file: "Path",
load_prefix: str = "load",
default_tf_graph: bool = False,
auto_batch_size: Union[bool, int, AutoBatchSize] = False,
input_map: Optional[dict] = None,
neighbor_list=None,
):
self.graph = self._load_graph(
model_file,
prefix=load_prefix,
default_tf_graph=default_tf_graph,
input_map=input_map,
)
self.load_prefix = load_prefix
# graph_compatable should be called after graph and prefix are set
if not self._graph_compatable():
raise RuntimeError(
f"model in graph (version {self.model_version}) is incompatible"
f"with the model (version {MODEL_VERSION}) supported by the current code."
"See https://deepmd.rtfd.io/compatability/ for details."
)
# set default to False, as subclasses may not support
if isinstance(auto_batch_size, bool):
if auto_batch_size:
self.auto_batch_size = AutoBatchSize()
else:
self.auto_batch_size = None
elif isinstance(auto_batch_size, int):
self.auto_batch_size = AutoBatchSize(auto_batch_size)
elif isinstance(auto_batch_size, AutoBatchSize):
self.auto_batch_size = auto_batch_size
else:
raise TypeError("auto_batch_size should be bool, int, or AutoBatchSize")
self.neighbor_list = neighbor_list
@property
@lru_cache(maxsize=None)
[docs]
def model_type(self) -> str:
"""Get type of model.
:type:str
"""
t_mt = self._get_tensor("model_attr/model_type:0")
[mt] = run_sess(self.sess, [t_mt], feed_dict={})
return mt.decode("utf-8")
@property
@lru_cache(maxsize=None)
[docs]
def model_version(self) -> str:
"""Get version of model.
Returns
-------
str
version of model
"""
try:
t_mt = self._get_tensor("model_attr/model_version:0")
except KeyError:
# For deepmd-kit version 0.x - 1.x, set model version to 0.0
return "0.0"
else:
[mt] = run_sess(self.sess, [t_mt], feed_dict={})
return mt.decode("utf-8")
@property
@lru_cache(maxsize=None)
[docs]
def sess(self) -> tf.Session:
"""Get TF session."""
# start a tf session associated to the graph
return tf.Session(graph=self.graph, config=default_tf_session_config)
[docs]
def _graph_compatable(self) -> bool:
"""Check the model compatability.
Returns
-------
bool
If the model stored in the graph file is compatable with the current code
"""
model_version_major = int(self.model_version.split(".")[0])
model_version_minor = int(self.model_version.split(".")[1])
MODEL_VERSION_MAJOR = int(MODEL_VERSION.split(".")[0])
MODEL_VERSION_MINOR = int(MODEL_VERSION.split(".")[1])
if (model_version_major != MODEL_VERSION_MAJOR) or (
model_version_minor > MODEL_VERSION_MINOR
):
return False
else:
return True
[docs]
def _get_tensor(
self, tensor_name: str, attr_name: Optional[str] = None
) -> tf.Tensor:
"""Get TF graph tensor and assign it to class namespace.
Parameters
----------
tensor_name : str
name of tensor to get
attr_name : Optional[str], optional
if specified, class attribute with this name will be created and tensor will
be assigned to it, by default None
Returns
-------
tf.Tensor
loaded tensor
"""
# do not use os.path.join as it doesn't work on Windows
tensor_path = "/".join((self.load_prefix, tensor_name))
tensor = self.graph.get_tensor_by_name(tensor_path)
if attr_name:
setattr(self, attr_name, tensor)
return tensor
else:
return tensor
@staticmethod
[docs]
def _load_graph(
frozen_graph_filename: "Path",
prefix: str = "load",
default_tf_graph: bool = False,
input_map: Optional[dict] = None,
):
# We load the protobuf file from the disk and parse it to retrieve the
# unserialized graph_def
with tf.gfile.GFile(str(frozen_graph_filename), "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
if default_tf_graph:
tf.import_graph_def(
graph_def,
input_map=input_map,
return_elements=None,
name=prefix,
producer_op_list=None,
)
graph = tf.get_default_graph()
else:
# Then, we can use again a convenient built-in function to import
# a graph_def into the current default Graph
with tf.Graph().as_default() as graph:
tf.import_graph_def(
graph_def,
input_map=None,
return_elements=None,
name=prefix,
producer_op_list=None,
)
return graph
@staticmethod
@staticmethod
[docs]
def reverse_map(vec: np.ndarray, imap: List[int]) -> np.ndarray:
"""Reverse mapping of a vector according to the index map.
Parameters
----------
vec
Input vector. Be of shape [nframes, natoms, -1]
imap
Index map. Be of shape [natoms]
Returns
-------
vec_out
Reverse mapped vector.
"""
ret = np.zeros(vec.shape)
# for idx,ii in enumerate(imap) :
# ret[:,ii,:] = vec[:,idx,:]
ret[:, imap, :] = vec
return ret
[docs]
def make_natoms_vec(
self, atom_types: np.ndarray, mixed_type: bool = False
) -> np.ndarray:
"""Make the natom vector used by deepmd-kit.
Parameters
----------
atom_types
The type of atoms
mixed_type
Whether to perform the mixed_type mode.
If True, the input data has the mixed_type format (see doc/model/train_se_atten.md),
in which frames in a system may have different natoms_vec(s), with the same nloc.
Returns
-------
natoms
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
"""
natoms_vec = np.zeros(self.ntypes + 2).astype(int)
if mixed_type:
natoms = atom_types[0].size
else:
natoms = atom_types.size
natoms_vec[0] = natoms
natoms_vec[1] = natoms
if mixed_type:
natoms_vec[2] = natoms
return natoms_vec
for ii in range(self.ntypes):
natoms_vec[ii + 2] = np.count_nonzero(atom_types == ii)
return natoms_vec
[docs]
def eval_typeebd(self) -> np.ndarray:
"""Evaluate output of type embedding network by using this model.
Returns
-------
np.ndarray
The output of type embedding network. The shape is [ntypes, o_size],
where ntypes is the number of types, and o_size is the number of nodes
in the output layer.
Raises
------
KeyError
If the model does not enable type embedding.
See Also
--------
deepmd.tf.utils.type_embed.TypeEmbedNet : The type embedding network.
Examples
--------
Get the output of type embedding network of `graph.pb`:
>>> from deepmd.tf.infer import DeepPotential
>>> dp = DeepPotential("graph.pb")
>>> dp.eval_typeebd()
"""
t_typeebd = self._get_tensor("t_typeebd:0")
[typeebd] = run_sess(self.sess, [t_typeebd], feed_dict={})
return typeebd
[docs]
def build_neighbor_list(
self,
coords: np.ndarray,
cell: Optional[np.ndarray],
atype: np.ndarray,
imap: np.ndarray,
neighbor_list,
):
"""Make the mesh with neighbor list for a single frame.
Parameters
----------
coords : np.ndarray
The coordinates of atoms. Should be of shape [natoms, 3]
cell : Optional[np.ndarray]
The cell of the system. Should be of shape [3, 3]
atype : np.ndarray
The type of atoms. Should be of shape [natoms]
imap : np.ndarray
The index map of atoms. Should be of shape [natoms]
neighbor_list : ase.neighborlist.NewPrimitiveNeighborList
ASE neighbor list. The following method or attribute will be
used/set: bothways, self_interaction, update, build, first_neigh,
pair_second, offset_vec.
Returns
-------
natoms_vec : np.ndarray
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: nloc
natoms[1]: nall
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms for nloc
coords : np.ndarray
The coordinates of atoms, including ghost atoms. Should be of
shape [nframes, nall, 3]
atype : np.ndarray
The type of atoms, including ghost atoms. Should be of shape [nall]
mesh : np.ndarray
The mesh in nei_mode=4.
imap : np.ndarray
The index map of atoms. Should be of shape [nall]
ghost_map : np.ndarray
The index map of ghost atoms. Should be of shape [nghost]
"""
pbc = np.repeat(cell is not None, 3)
cell = cell.reshape(3, 3)
positions = coords.reshape(-1, 3)
neighbor_list.bothways = True
neighbor_list.self_interaction = False
if neighbor_list.update(pbc, cell, positions):
neighbor_list.build(pbc, cell, positions)
first_neigh = neighbor_list.first_neigh.copy()
pair_second = neighbor_list.pair_second.copy()
offset_vec = neighbor_list.offset_vec.copy()
# get out-of-box neighbors
out_mask = np.any(offset_vec != 0, axis=1)
out_idx = pair_second[out_mask]
out_offset = offset_vec[out_mask]
out_coords = positions[out_idx] + out_offset.dot(cell)
atype = np.array(atype, dtype=int)
out_atype = atype[out_idx]
nloc = positions.shape[0]
nghost = out_idx.size
all_coords = np.concatenate((positions, out_coords), axis=0)
all_atype = np.concatenate((atype, out_atype), axis=0)
# convert neighbor indexes
ghost_map = pair_second[out_mask]
pair_second[out_mask] = np.arange(nloc, nloc + nghost)
# get the mesh
mesh = np.zeros(16 + nloc * 2 + pair_second.size, dtype=int)
mesh[0] = nloc
# ilist
mesh[16 : 16 + nloc] = np.arange(nloc)
# numnei
mesh[16 + nloc : 16 + nloc * 2] = first_neigh[1:] - first_neigh[:-1]
# jlist
mesh[16 + nloc * 2 :] = pair_second
# natoms_vec
natoms_vec = np.zeros(self.ntypes + 2).astype(int)
natoms_vec[0] = nloc
natoms_vec[1] = nloc + nghost
for ii in range(self.ntypes):
natoms_vec[ii + 2] = np.count_nonzero(atype == ii)
# imap append ghost atoms
imap = np.concatenate((imap, np.arange(nloc, nloc + nghost)))
return natoms_vec, all_coords, all_atype, mesh, imap, ghost_map