# SPDX-License-Identifier: LGPL-3.0-or-later
import logging
import re
import warnings
from typing import (
Any,
List,
Optional,
Set,
Tuple,
Union,
)
import numpy as np
from packaging.version import (
Version,
)
from deepmd.dpmodel.utils.env_mat import (
EnvMat,
)
from deepmd.dpmodel.utils.network import (
EmbeddingNet,
LayerNorm,
NativeLayer,
NetworkCollection,
)
from deepmd.tf.common import (
cast_precision,
get_np_precision,
)
from deepmd.tf.env import (
ATTENTION_LAYER_PATTERN,
GLOBAL_NP_FLOAT_PRECISION,
GLOBAL_TF_FLOAT_PRECISION,
TF_VERSION,
default_tf_session_config,
op_module,
tf,
)
from deepmd.tf.nvnmd.descriptor.se_atten import (
build_davg_dstd,
build_op_descriptor,
check_switch_range,
descrpt2r4,
filter_GR2D,
filter_lower_R42GR,
)
from deepmd.tf.nvnmd.utils.config import (
nvnmd_cfg,
)
from deepmd.tf.utils.compress import (
get_extra_side_embedding_net_variable,
get_two_side_type_embedding,
make_data,
)
from deepmd.tf.utils.graph import (
get_attention_layer_variables_from_graph_def,
get_extra_embedding_net_suffix,
get_extra_embedding_net_variables_from_graph_def,
get_pattern_nodes_from_graph_def,
get_tensor_by_name_from_graph,
)
from deepmd.tf.utils.network import (
embedding_net,
layernorm,
one_layer,
)
from deepmd.tf.utils.sess import (
run_sess,
)
from deepmd.tf.utils.tabulate import (
DPTabulate,
)
from deepmd.tf.utils.type_embed import (
TypeEmbedNet,
)
from deepmd.tf.utils.update_sel import (
UpdateSel,
)
from deepmd.utils.version import (
check_version_compatibility,
)
from .descriptor import (
Descriptor,
)
from .se_a import (
DescrptSeA,
)
[docs]
log = logging.getLogger(__name__)
@Descriptor.register("dpa1")
@Descriptor.register("se_atten")
[docs]
class DescrptSeAtten(DescrptSeA):
r"""Smooth version descriptor with attention.
Parameters
----------
rcut: float
The cut-off radius :math:`r_c`
rcut_smth: float
From where the environment matrix should be smoothed :math:`r_s`
sel: list[int], int
list[int]: sel[i] specifies the maxmum number of type i atoms in the cut-off radius
int: the total maxmum number of atoms in the cut-off radius
neuron: list[int]
Number of neurons in each hidden layers of the embedding net :math:`\mathcal{N}`
axis_neuron: int
Number of the axis neuron :math:`M_2` (number of columns of the sub-matrix of the embedding matrix)
resnet_dt: bool
Time-step `dt` in the resnet construction:
y = x + dt * \phi (Wx + b)
trainable: bool
If the weights of embedding net are trainable.
seed: int, Optional
Random seed for initializing the network parameters.
type_one_side: bool
If 'False', type embeddings of both neighbor and central atoms are considered.
If 'True', only type embeddings of neighbor atoms are considered.
Default is 'False'.
exclude_types : List[List[int]]
The excluded pairs of types which have no interaction with each other.
For example, `[[0, 1]]` means no interaction between type 0 and type 1.
set_davg_zero: bool
Set the shift of embedding net input to zero.
activation_function: str
The activation function in the embedding net. Supported options are |ACTIVATION_FN|
precision: str
The precision of the embedding net parameters. Supported options are |PRECISION|
uniform_seed: bool
Only for the purpose of backward compatibility, retrieves the old behavior of using the random seed
attn: int
The length of hidden vector during scale-dot attention computation.
attn_layer: int
The number of layers in attention mechanism.
attn_dotr: bool
Whether to dot the relative coordinates on the attention weights as a gated scheme.
attn_mask: bool
Whether to mask the diagonal in the attention weights.
ln_eps: float, Optional
The epsilon value for layer normalization.
multi_task: bool
If the model has multi fitting nets to train.
tebd_input_mode: str
The input mode of the type embedding. Supported modes are ["concat", "strip"].
- "concat": Concatenate the type embedding with the smoothed radial information as the union input for the embedding network.
- "strip": Use a separated embedding network for the type embedding and combine the output with the radial embedding network output.
Default value will be `strip` in `se_atten_v2` descriptor.
smooth_type_embedding: bool
Whether to use smooth process in attention weights calculation.
And when using stripped type embedding, whether to dot smooth factor on the network output of type embedding
to keep the network smooth, instead of setting `set_davg_zero` to be True.
Default value will be True in `se_atten_v2` descriptor.
stripped_type_embedding: bool, Optional
(Deprecated, kept only for compatibility.)
Whether to strip the type embedding into a separate embedding network.
Setting this parameter to `True` is equivalent to setting `tebd_input_mode` to 'strip'.
Setting it to `False` is equivalent to setting `tebd_input_mode` to 'concat'.
The default value is `None`, which means the `tebd_input_mode` setting will be used instead.
Raises
------
ValueError
if ntypes is 0.
"""
def __init__(
self,
rcut: float,
rcut_smth: float,
sel: Union[List[int], int],
ntypes: int,
neuron: List[int] = [25, 50, 100],
axis_neuron: int = 8,
resnet_dt: bool = False,
trainable: bool = True,
seed: Optional[int] = None,
type_one_side: bool = True,
set_davg_zero: bool = True,
exclude_types: List[List[int]] = [],
activation_function: str = "tanh",
precision: str = "default",
uniform_seed: bool = False,
attn: int = 128,
attn_layer: int = 2,
attn_dotr: bool = True,
attn_mask: bool = False,
multi_task: bool = False,
smooth_type_embedding: bool = False,
tebd_input_mode: str = "concat",
# not implemented
scaling_factor=1.0,
normalize=True,
temperature=None,
trainable_ln: bool = True,
ln_eps: Optional[float] = 1e-3,
concat_output_tebd: bool = True,
env_protection: float = 0.0, # not implement!!
stripped_type_embedding: Optional[bool] = None,
**kwargs,
) -> None:
# Ensure compatibility with the deprecated stripped_type_embedding option.
if stripped_type_embedding is None:
stripped_type_embedding = tebd_input_mode == "strip"
else:
# Use the user-set stripped_type_embedding parameter first
tebd_input_mode = "strip" if stripped_type_embedding else "concat"
if not set_davg_zero and not (
stripped_type_embedding and smooth_type_embedding
):
warnings.warn(
"Set 'set_davg_zero' False in descriptor 'se_atten' "
"may cause unexpected incontinuity during model inference!"
)
if scaling_factor != 1.0:
raise NotImplementedError("scaling_factor is not supported.")
if not normalize:
raise NotImplementedError("normalize is not supported.")
if temperature is not None:
raise NotImplementedError("temperature is not supported.")
if not concat_output_tebd:
raise NotImplementedError("concat_output_tebd is not supported.")
if env_protection != 0.0:
raise NotImplementedError("env_protection != 0.0 is not supported.")
# to keep consistent with default value in this backends
if ln_eps is None:
ln_eps = 1e-3
if isinstance(sel, list):
sel = sum(sel)
DescrptSeA.__init__(
self,
rcut,
rcut_smth,
[sel],
neuron=neuron,
axis_neuron=axis_neuron,
resnet_dt=resnet_dt,
trainable=trainable,
seed=seed,
type_one_side=type_one_side,
exclude_types=exclude_types,
set_davg_zero=set_davg_zero,
activation_function=activation_function,
precision=precision,
uniform_seed=uniform_seed,
multi_task=multi_task,
)
"""
Constructor
"""
if not (nvnmd_cfg.enable and (nvnmd_cfg.version == 1)):
assert Version(TF_VERSION) > Version(
"2"
), "se_atten only support tensorflow version 2.0 or higher."
if ntypes == 0:
raise ValueError("`model/type_map` is not set or empty!")
self.stripped_type_embedding = stripped_type_embedding
self.tebd_input_mode = tebd_input_mode
self.smooth = smooth_type_embedding
self.trainable_ln = trainable_ln
self.ln_eps = ln_eps
self.ntypes = ntypes
self.att_n = attn
self.attn_layer = attn_layer
self.attn_mask = attn_mask
self.attn_dotr = attn_dotr
self.filter_np_precision = get_np_precision(precision)
self.two_side_embeeding_net_variables = None
self.layer_size = len(neuron)
# descrpt config
self.sel_all_a = [sel]
self.sel_all_r = [0]
avg_zero = np.zeros([self.ntypes, self.ndescrpt]).astype(
GLOBAL_NP_FLOAT_PRECISION
)
std_ones = np.ones([self.ntypes, self.ndescrpt]).astype(
GLOBAL_NP_FLOAT_PRECISION
)
self.attention_layer_variables = None
sub_graph = tf.Graph()
with sub_graph.as_default():
name_pfx = "d_sea_"
for ii in ["coord", "box"]:
self.place_holders[ii] = tf.placeholder(
GLOBAL_NP_FLOAT_PRECISION, [None, None], name=name_pfx + "t_" + ii
)
self.place_holders["type"] = tf.placeholder(
tf.int32, [None, None], name=name_pfx + "t_type"
)
self.place_holders["natoms_vec"] = tf.placeholder(
tf.int32, [self.ntypes + 2], name=name_pfx + "t_natoms"
)
self.place_holders["default_mesh"] = tf.placeholder(
tf.int32, [None], name=name_pfx + "t_mesh"
)
(
self.stat_descrpt,
self.descrpt_deriv_t,
self.rij_t,
self.nlist_t,
self.nei_type_vec_t,
self.nmask_t,
) = op_module.prod_env_mat_a_mix(
self.place_holders["coord"],
self.place_holders["type"],
self.place_holders["natoms_vec"],
self.place_holders["box"],
self.place_holders["default_mesh"],
tf.constant(avg_zero),
tf.constant(std_ones),
rcut_a=self.rcut_a,
rcut_r=self.rcut_r,
rcut_r_smth=self.rcut_r_smth,
sel_a=self.sel_all_a,
sel_r=self.sel_all_r,
)
if len(self.exclude_types):
# exclude types applied to data stat
mask = self.build_type_exclude_mask_mixed(
self.exclude_types,
self.ntypes,
self.sel_a,
self.ndescrpt,
# for data stat, nloc == nall
self.place_holders["type"],
tf.size(self.place_holders["type"]),
self.nei_type_vec_t, # extra input for atten
)
self.stat_descrpt *= tf.reshape(mask, tf.shape(self.stat_descrpt))
self.sub_sess = tf.Session(graph=sub_graph, config=default_tf_session_config)
[docs]
def enable_compression(
self,
min_nbor_dist: float,
graph: tf.Graph,
graph_def: tf.GraphDef,
table_extrapolate: float = 5,
table_stride_1: float = 0.01,
table_stride_2: float = 0.1,
check_frequency: int = -1,
suffix: str = "",
) -> None:
"""Reveive the statisitcs (distance, max_nbor_size and env_mat_range) of the training data.
Parameters
----------
min_nbor_dist
The nearest distance between atoms
graph : tf.Graph
The graph of the model
graph_def : tf.GraphDef
The graph_def of the model
table_extrapolate
The scale of model extrapolation
table_stride_1
The uniform stride of the first table
table_stride_2
The uniform stride of the second table
check_frequency
The overflow check frequency
suffix : str, optional
The suffix of the scope
"""
# do some checks before the mocel compression process
assert (
not self.filter_resnet_dt
), "Model compression error: descriptor resnet_dt must be false!"
for tt in self.exclude_types:
if (tt[0] not in range(self.ntypes)) or (tt[1] not in range(self.ntypes)):
raise RuntimeError(
"exclude types"
+ str(tt)
+ " must within the number of atomic types "
+ str(self.ntypes)
+ "!"
)
if self.ntypes * self.ntypes - len(self.exclude_types) == 0:
raise RuntimeError(
"empty embedding-net are not supported in model compression!"
)
if self.attn_layer != 0:
raise RuntimeError("can not compress model when attention layer is not 0.")
ret = get_pattern_nodes_from_graph_def(
graph_def,
f"filter_type_all{suffix}/.+{get_extra_embedding_net_suffix(type_one_side=False)}",
)
if len(ret) == 0:
raise RuntimeError(
f"can not find variables of embedding net `*{get_extra_embedding_net_suffix(type_one_side=False)}` from graph_def, maybe it is not a compressible model."
)
self.compress = True
self.table = DPTabulate(
self,
self.filter_neuron,
graph,
graph_def,
True,
self.exclude_types,
self.compress_activation_fn,
suffix=suffix,
)
self.table_config = [
table_extrapolate,
table_stride_1,
table_stride_2,
check_frequency,
]
self.lower, self.upper = self.table.build(
min_nbor_dist, table_extrapolate, table_stride_1, table_stride_2
)
self.final_type_embedding = get_two_side_type_embedding(self, graph)
type_side_suffix = get_extra_embedding_net_suffix(type_one_side=False)
self.matrix = get_extra_side_embedding_net_variable(
self, graph_def, type_side_suffix, "matrix", suffix
)
self.bias = get_extra_side_embedding_net_variable(
self, graph_def, type_side_suffix, "bias", suffix
)
self.two_embd = make_data(self, self.final_type_embedding)
self.davg = get_tensor_by_name_from_graph(graph, f"descrpt_attr{suffix}/t_avg")
self.dstd = get_tensor_by_name_from_graph(graph, f"descrpt_attr{suffix}/t_std")
[docs]
def build(
self,
coord_: tf.Tensor,
atype_: tf.Tensor,
natoms: tf.Tensor,
box_: tf.Tensor,
mesh: tf.Tensor,
input_dict: dict,
reuse: Optional[bool] = None,
suffix: str = "",
) -> tf.Tensor:
"""Build the computational graph for the descriptor.
Parameters
----------
coord_
The coordinate of atoms
atype_
The type of atoms
natoms
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
box_ : tf.Tensor
The box of the system
mesh
For historical reasons, only the length of the Tensor matters.
if size of mesh == 6, pbc is assumed.
if size of mesh == 0, no-pbc is assumed.
input_dict
Dictionary for additional inputs
reuse
The weights in the networks should be reused when get the variable.
suffix
Name suffix to identify this descriptor
Returns
-------
descriptor
The output descriptor
"""
davg = self.davg
dstd = self.dstd
if nvnmd_cfg.enable:
nvnmd_cfg.set_ntype(self.ntypes)
if nvnmd_cfg.restore_descriptor:
davg, dstd = build_davg_dstd()
check_switch_range(davg, dstd)
with tf.variable_scope("descrpt_attr" + suffix, reuse=reuse):
if davg is None:
davg = np.zeros([self.ntypes, self.ndescrpt])
if dstd is None:
dstd = np.ones([self.ntypes, self.ndescrpt])
t_rcut = tf.constant(
np.max([self.rcut_r, self.rcut_a]),
name="rcut",
dtype=GLOBAL_TF_FLOAT_PRECISION,
)
t_ntypes = tf.constant(self.ntypes, name="ntypes", dtype=tf.int32)
t_ndescrpt = tf.constant(self.ndescrpt, name="ndescrpt", dtype=tf.int32)
t_sel = tf.constant(self.sel_a, name="sel", dtype=tf.int32)
t_original_sel = tf.constant(
self.original_sel if self.original_sel is not None else self.sel_a,
name="original_sel",
dtype=tf.int32,
)
self.t_avg = tf.get_variable(
"t_avg",
davg.shape,
dtype=GLOBAL_TF_FLOAT_PRECISION,
trainable=False,
initializer=tf.constant_initializer(davg),
)
self.t_std = tf.get_variable(
"t_std",
dstd.shape,
dtype=GLOBAL_TF_FLOAT_PRECISION,
trainable=False,
initializer=tf.constant_initializer(dstd),
)
with tf.control_dependencies([t_sel, t_original_sel]):
coord = tf.reshape(coord_, [-1, natoms[1] * 3])
box = tf.reshape(box_, [-1, 9])
atype = tf.reshape(atype_, [-1, natoms[1]])
self.attn_weight = [None for i in range(self.attn_layer)]
self.angular_weight = [None for i in range(self.attn_layer)]
self.attn_weight_final = [None for i in range(self.attn_layer)]
op_descriptor = (
build_op_descriptor() if nvnmd_cfg.enable else op_module.prod_env_mat_a_mix
)
(
self.descrpt,
self.descrpt_deriv,
self.rij,
self.nlist,
self.nei_type_vec,
self.nmask,
) = op_descriptor(
coord,
atype,
natoms,
box,
mesh,
self.t_avg,
self.t_std,
rcut_a=self.rcut_a,
rcut_r=self.rcut_r,
rcut_r_smth=self.rcut_r_smth,
sel_a=self.sel_all_a,
sel_r=self.sel_all_r,
)
self.nei_type_vec = tf.reshape(self.nei_type_vec, [-1])
self.nmask = tf.cast(
tf.reshape(self.nmask, [-1, 1, self.sel_all_a[0]]),
self.filter_precision,
)
self.negative_mask = -(2 << 32) * (1.0 - self.nmask)
# hard coding the magnitude of attention weight shift
self.smth_attn_w_shift = 20.0
# only used when tensorboard was set as true
tf.summary.histogram("descrpt", self.descrpt)
tf.summary.histogram("rij", self.rij)
tf.summary.histogram("nlist", self.nlist)
self.descrpt_reshape = tf.reshape(self.descrpt, [-1, self.ndescrpt])
# prevent lookup error; the actual atype already used for nlist
atype = tf.clip_by_value(atype, 0, self.ntypes - 1)
self.atype_nloc = tf.reshape(
tf.slice(atype, [0, 0], [-1, natoms[0]]), [-1]
) ## lammps will have error without this
self._identity_tensors(suffix=suffix)
if self.smooth:
self.sliced_avg = tf.reshape(
tf.slice(
tf.reshape(self.t_avg, [self.ntypes, -1, 4]), [0, 0, 0], [-1, 1, 1]
),
[self.ntypes, 1],
)
self.sliced_std = tf.reshape(
tf.slice(
tf.reshape(self.t_std, [self.ntypes, -1, 4]), [0, 0, 0], [-1, 1, 1]
),
[self.ntypes, 1],
)
self.avg_looked_up = tf.reshape(
tf.nn.embedding_lookup(self.sliced_avg, self.atype_nloc),
[-1, natoms[0], 1],
)
self.std_looked_up = tf.reshape(
tf.nn.embedding_lookup(self.sliced_std, self.atype_nloc),
[-1, natoms[0], 1],
)
self.recovered_r = (
tf.reshape(
tf.slice(
tf.reshape(self.descrpt_reshape, [-1, 4]), [0, 0], [-1, 1]
),
[-1, natoms[0], self.sel_all_a[0]],
)
* self.std_looked_up
+ self.avg_looked_up
)
uu = 1 - self.rcut_r_smth * self.recovered_r
self.recovered_switch = -uu * uu * uu + 1
self.recovered_switch = tf.clip_by_value(self.recovered_switch, 0.0, 1.0)
self.recovered_switch = tf.cast(
self.recovered_switch, self.filter_precision
)
self.dout, self.qmat = self._pass_filter(
self.descrpt_reshape,
self.atype_nloc,
natoms,
input_dict,
suffix=suffix,
reuse=reuse,
trainable=self.trainable,
)
# only used when tensorboard was set as true
tf.summary.histogram("embedding_net_output", self.dout)
return self.dout
[docs]
def _pass_filter(
self, inputs, atype, natoms, input_dict, reuse=None, suffix="", trainable=True
):
assert (
input_dict is not None
and input_dict.get("type_embedding", None) is not None
), "se_atten desctiptor must use type_embedding"
type_embedding = input_dict.get("type_embedding", None)
inputs = tf.reshape(inputs, [-1, natoms[0], self.ndescrpt])
output = []
output_qmat = []
inputs_i = inputs
inputs_i = tf.reshape(inputs_i, [-1, self.ndescrpt])
type_i = -1
if len(self.exclude_types):
mask = self.build_type_exclude_mask_mixed(
self.exclude_types,
self.ntypes,
self.sel_a,
self.ndescrpt,
self.atype_nloc, # when nloc != nall, pass nloc to mask
tf.shape(inputs_i)[0],
self.nei_type_vec, # extra input for atten
)
if self.smooth:
inputs_i = tf.where(
tf.cast(mask, tf.bool),
inputs_i,
# (nframes * nloc, 1) -> (nframes * nloc, ndescrpt)
tf.tile(
tf.reshape(self.avg_looked_up, [-1, 1]), [1, self.ndescrpt]
),
)
self.recovered_switch *= tf.reshape(
tf.slice(
tf.reshape(tf.cast(mask, self.filter_precision), [-1, 4]),
[0, 0],
[-1, 1],
),
[-1, natoms[0], self.sel_all_a[0]],
)
else:
inputs_i *= mask
if nvnmd_cfg.enable and nvnmd_cfg.quantize_descriptor:
inputs_i = descrpt2r4(inputs_i, atype)
layer, qmat = self._filter(
inputs_i,
type_i,
natoms,
name="filter_type_all" + suffix,
suffix=suffix,
reuse=reuse,
trainable=trainable,
activation_fn=self.filter_activation_fn,
type_embedding=type_embedding,
atype=atype,
)
layer = tf.reshape(layer, [tf.shape(inputs)[0], natoms[0], self.get_dim_out()])
qmat = tf.reshape(
qmat, [tf.shape(inputs)[0], natoms[0], self.get_dim_rot_mat_1() * 3]
)
output.append(layer)
output_qmat.append(qmat)
output = tf.concat(output, axis=1)
output_qmat = tf.concat(output_qmat, axis=1)
return output, output_qmat
[docs]
def _compute_dstats_sys_smth(
self,
data_coord,
data_box,
data_atype,
natoms_vec,
mesh,
mixed_type=False,
real_natoms_vec=None,
):
dd_all, descrpt_deriv_t, rij_t, nlist_t, nei_type_vec_t, nmask_t = run_sess(
self.sub_sess,
[
self.stat_descrpt,
self.descrpt_deriv_t,
self.rij_t,
self.nlist_t,
self.nei_type_vec_t,
self.nmask_t,
],
feed_dict={
self.place_holders["coord"]: data_coord,
self.place_holders["type"]: data_atype,
self.place_holders["natoms_vec"]: natoms_vec,
self.place_holders["box"]: data_box,
self.place_holders["default_mesh"]: mesh,
},
)
if mixed_type:
nframes = dd_all.shape[0]
sysr = [0.0 for i in range(self.ntypes)]
sysa = [0.0 for i in range(self.ntypes)]
sysn = [0 for i in range(self.ntypes)]
sysr2 = [0.0 for i in range(self.ntypes)]
sysa2 = [0.0 for i in range(self.ntypes)]
for ff in range(nframes):
natoms = real_natoms_vec[ff]
dd_ff = np.reshape(dd_all[ff], [-1, self.ndescrpt * natoms_vec[0]])
start_index = 0
for type_i in range(self.ntypes):
end_index = (
start_index + self.ndescrpt * natoms[2 + type_i]
) # center atom split
dd = dd_ff[:, start_index:end_index]
dd = np.reshape(
dd, [-1, self.ndescrpt]
) # nframes * typen_atoms , nnei * 4
start_index = end_index
# compute
dd = np.reshape(dd, [-1, 4]) # nframes * typen_atoms * nnei, 4
ddr = dd[:, :1]
dda = dd[:, 1:]
sumr = np.sum(ddr)
suma = np.sum(dda) / 3.0
sumn = dd.shape[0]
sumr2 = np.sum(np.multiply(ddr, ddr))
suma2 = np.sum(np.multiply(dda, dda)) / 3.0
sysr[type_i] += sumr
sysa[type_i] += suma
sysn[type_i] += sumn
sysr2[type_i] += sumr2
sysa2[type_i] += suma2
else:
natoms = natoms_vec
dd_all = np.reshape(dd_all, [-1, self.ndescrpt * natoms[0]])
start_index = 0
sysr = []
sysa = []
sysn = []
sysr2 = []
sysa2 = []
for type_i in range(self.ntypes):
end_index = (
start_index + self.ndescrpt * natoms[2 + type_i]
) # center atom split
dd = dd_all[:, start_index:end_index]
dd = np.reshape(
dd, [-1, self.ndescrpt]
) # nframes * typen_atoms , nnei * 4
start_index = end_index
# compute
dd = np.reshape(dd, [-1, 4]) # nframes * typen_atoms * nnei, 4
ddr = dd[:, :1]
dda = dd[:, 1:]
sumr = np.sum(ddr)
suma = np.sum(dda) / 3.0
sumn = dd.shape[0]
sumr2 = np.sum(np.multiply(ddr, ddr))
suma2 = np.sum(np.multiply(dda, dda)) / 3.0
sysr.append(sumr)
sysa.append(suma)
sysn.append(sumn)
sysr2.append(sumr2)
sysa2.append(suma2)
return sysr, sysr2, sysa, sysa2, sysn
[docs]
def _lookup_type_embedding(
self,
xyz_scatter,
natype,
type_embedding,
):
"""Concatenate `type_embedding` of neighbors and `xyz_scatter`.
If not self.type_one_side, concatenate `type_embedding` of center atoms as well.
Parameters
----------
xyz_scatter:
shape is [nframes*natoms[0]*self.nnei, 1]
natype:
neighbor atom type
type_embedding:
shape is [self.ntypes, Y] where Y=jdata['type_embedding']['neuron'][-1]
Returns
-------
embedding:
environment of each atom represented by embedding.
"""
te_out_dim = type_embedding.get_shape().as_list()[-1]
self.test_type_embedding = type_embedding
self.test_nei_embed = tf.nn.embedding_lookup(
type_embedding, self.nei_type_vec
) # shape is [self.nnei, 1+te_out_dim]
# nei_embed = tf.tile(nei_embed, (nframes * natoms[0], 1)) # shape is [nframes*natoms[0]*self.nnei, te_out_dim]
nei_embed = tf.reshape(self.test_nei_embed, [-1, te_out_dim])
self.embedding_input = tf.concat(
[xyz_scatter, nei_embed], 1
) # shape is [nframes*natoms[0]*self.nnei, 1+te_out_dim]
if not self.type_one_side:
self.atm_embed = tf.nn.embedding_lookup(
type_embedding, natype
) # shape is [nframes*natoms[0], te_out_dim]
self.atm_embed = tf.tile(
self.atm_embed, [1, self.nnei]
) # shape is [nframes*natoms[0], self.nnei*te_out_dim]
self.atm_embed = tf.reshape(
self.atm_embed, [-1, te_out_dim]
) # shape is [nframes*natoms[0]*self.nnei, te_out_dim]
self.embedding_input_2 = tf.concat(
[self.embedding_input, self.atm_embed], 1
) # shape is [nframes*natoms[0]*self.nnei, 1+te_out_dim+te_out_dim]
return self.embedding_input_2
return self.embedding_input
[docs]
def _scaled_dot_attn(
self,
Q,
K,
V,
temperature,
input_r,
dotr=False,
do_mask=False,
layer=0,
save_weights=True,
):
attn = tf.matmul(Q / temperature, K, transpose_b=True)
if self.smooth:
# (nb x nloc) x nsel
nsel = self.sel_all_a[0]
attn = (attn + self.smth_attn_w_shift) * tf.reshape(
self.recovered_switch, [-1, 1, nsel]
) * tf.reshape(
self.recovered_switch, [-1, nsel, 1]
) - self.smth_attn_w_shift
else:
attn *= self.nmask
attn += self.negative_mask
attn = tf.nn.softmax(attn, axis=-1)
if self.smooth:
attn = (
attn
* tf.reshape(self.recovered_switch, [-1, 1, nsel])
* tf.reshape(self.recovered_switch, [-1, nsel, 1])
)
else:
attn *= tf.reshape(self.nmask, [-1, attn.shape[-1], 1])
if save_weights:
self.attn_weight[layer] = attn[0] # atom 0
if dotr:
angular_weight = tf.matmul(input_r, input_r, transpose_b=True) # normalized
attn *= angular_weight
if save_weights:
self.angular_weight[layer] = angular_weight[0] # atom 0
self.attn_weight_final[layer] = attn[0] # atom 0
if do_mask:
nei = int(attn.shape[-1])
mask = tf.cast(tf.ones((nei, nei)) - tf.eye(nei), self.filter_precision)
attn *= mask
output = tf.matmul(attn, V)
return output
[docs]
def _attention_layers(
self,
input_xyz,
layer_num,
shape_i,
outputs_size,
input_r,
dotr=False,
do_mask=False,
trainable=True,
suffix="",
):
sd_k = tf.sqrt(tf.cast(1.0, dtype=self.filter_precision))
for i in range(layer_num):
name = f"attention_layer_{i}{suffix}"
with tf.variable_scope(name, reuse=tf.AUTO_REUSE):
# input_xyz_in = tf.nn.l2_normalize(input_xyz, -1)
Q_c = one_layer(
input_xyz,
self.att_n,
name="c_query",
scope=name + "/",
reuse=tf.AUTO_REUSE,
seed=self.seed,
activation_fn=None,
precision=self.filter_precision,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.attention_layer_variables,
)
K_c = one_layer(
input_xyz,
self.att_n,
name="c_key",
scope=name + "/",
reuse=tf.AUTO_REUSE,
seed=self.seed,
activation_fn=None,
precision=self.filter_precision,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.attention_layer_variables,
)
V_c = one_layer(
input_xyz,
self.att_n,
name="c_value",
scope=name + "/",
reuse=tf.AUTO_REUSE,
seed=self.seed,
activation_fn=None,
precision=self.filter_precision,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.attention_layer_variables,
)
# # natom x nei_type_i x out_size
# xyz_scatter = tf.reshape(xyz_scatter, (-1, shape_i[1] // 4, outputs_size[-1]))
# natom x nei_type_i x att_n
Q_c = tf.nn.l2_normalize(
tf.reshape(Q_c, (-1, shape_i[1] // 4, self.att_n)), -1
)
K_c = tf.nn.l2_normalize(
tf.reshape(K_c, (-1, shape_i[1] // 4, self.att_n)), -1
)
V_c = tf.nn.l2_normalize(
tf.reshape(V_c, (-1, shape_i[1] // 4, self.att_n)), -1
)
input_att = self._scaled_dot_attn(
Q_c, K_c, V_c, sd_k, input_r, dotr=dotr, do_mask=do_mask, layer=i
)
input_att = tf.reshape(input_att, (-1, self.att_n))
# (natom x nei_type_i) x out_size
input_xyz += one_layer(
input_att,
outputs_size[-1],
name="c_out",
scope=name + "/",
reuse=tf.AUTO_REUSE,
seed=self.seed,
activation_fn=None,
precision=self.filter_precision,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.attention_layer_variables,
)
input_xyz = layernorm(
input_xyz,
outputs_size[-1],
precision=self.filter_precision,
name="layer_normalization",
scope=name + "/",
reuse=tf.AUTO_REUSE,
seed=self.seed,
uniform_seed=self.uniform_seed,
trainable=self.trainable_ln,
eps=self.ln_eps,
initial_variables=self.attention_layer_variables,
)
return input_xyz
[docs]
def _filter_lower(
self,
type_i,
type_input,
start_index,
incrs_index,
inputs,
type_embedding=None,
atype=None,
is_exclude=False,
activation_fn=None,
bavg=0.0,
stddev=1.0,
trainable=True,
suffix="",
name="filter_",
reuse=None,
):
"""Input env matrix, returns R.G."""
outputs_size = [1, *self.filter_neuron]
# cut-out inputs
# with natom x (nei_type_i x 4)
inputs_i = tf.slice(inputs, [0, start_index * 4], [-1, incrs_index * 4])
shape_i = inputs_i.get_shape().as_list()
natom = tf.shape(inputs_i)[0]
# with (natom x nei_type_i) x 4
inputs_reshape = tf.reshape(inputs_i, [-1, 4])
# with (natom x nei_type_i) x 1
xyz_scatter = tf.reshape(tf.slice(inputs_reshape, [0, 0], [-1, 1]), [-1, 1])
assert atype is not None, "atype must exist!!"
type_embedding = tf.cast(type_embedding, self.filter_precision) # ntypes * Y
# natom x 4 x outputs_size
if not is_exclude:
with tf.variable_scope(name, reuse=reuse):
# with (natom x nei_type_i) x out_size
if not self.stripped_type_embedding:
log.info("use the previous se_atten model")
xyz_scatter = self._lookup_type_embedding(
xyz_scatter, atype, type_embedding
)
xyz_scatter = embedding_net(
xyz_scatter,
self.filter_neuron,
self.filter_precision,
activation_fn=activation_fn,
resnet_dt=self.filter_resnet_dt,
name_suffix="",
stddev=stddev,
bavg=bavg,
seed=self.seed,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.embedding_net_variables,
mixed_prec=self.mixed_prec,
)
else:
if self.attn_layer == 0:
log.info(
"use the compressible model with stripped type embedding"
)
else:
log.info(
"use the non-compressible model with stripped type embedding"
)
if nvnmd_cfg.enable:
if nvnmd_cfg.quantize_descriptor:
return filter_lower_R42GR(
inputs_i,
atype,
self.nei_type_vec,
)
elif nvnmd_cfg.restore_descriptor:
self.embedding_net_variables = (
nvnmd_cfg.get_dp_init_weights()
)
self.two_side_embeeding_net_variables = (
nvnmd_cfg.get_dp_init_weights()
)
if not self.compress:
xyz_scatter = embedding_net(
xyz_scatter,
self.filter_neuron,
self.filter_precision,
activation_fn=activation_fn,
resnet_dt=self.filter_resnet_dt,
name_suffix="",
stddev=stddev,
bavg=bavg,
seed=self.seed,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.embedding_net_variables,
mixed_prec=self.mixed_prec,
)
else:
net = "filter_net"
info = [
self.lower[net],
self.upper[net],
self.upper[net] * self.table_config[0],
self.table_config[1],
self.table_config[2],
self.table_config[3],
]
padding_ntypes = type_embedding.shape[
0
] # this must be self.ntypes + 1
atype_expand = tf.reshape(atype, [-1, 1])
idx_i = tf.tile(atype_expand * padding_ntypes, [1, self.nnei])
idx_j = tf.reshape(self.nei_type_vec, [-1, self.nnei])
idx = idx_i + idx_j
index_of_two_side = tf.reshape(idx, [-1])
if self.compress:
two_embd = tf.nn.embedding_lookup(
self.two_embd, index_of_two_side
)
else:
type_embedding_nei = tf.tile(
tf.reshape(type_embedding, [1, padding_ntypes, -1]),
[padding_ntypes, 1, 1],
) # (ntypes) * ntypes * Y
type_embedding_center = tf.tile(
tf.reshape(type_embedding, [padding_ntypes, 1, -1]),
[1, padding_ntypes, 1],
) # ntypes * (ntypes) * Y
two_side_type_embedding = tf.concat(
[type_embedding_nei, type_embedding_center], -1
) # ntypes * ntypes * (Y+Y)
two_side_type_embedding = tf.reshape(
two_side_type_embedding,
[-1, two_side_type_embedding.shape[-1]],
)
embedding_of_two_side_type_embedding = embedding_net(
two_side_type_embedding,
self.filter_neuron,
self.filter_precision,
activation_fn=activation_fn,
resnet_dt=self.filter_resnet_dt,
name_suffix=get_extra_embedding_net_suffix(
type_one_side=False
),
stddev=stddev,
bavg=bavg,
seed=self.seed,
trainable=trainable,
uniform_seed=self.uniform_seed,
initial_variables=self.two_side_embeeding_net_variables,
mixed_prec=self.mixed_prec,
)
two_embd = tf.nn.embedding_lookup(
embedding_of_two_side_type_embedding, index_of_two_side
)
if self.smooth:
two_embd = two_embd * tf.reshape(self.recovered_switch, [-1, 1])
if not self.compress:
xyz_scatter = xyz_scatter * two_embd + xyz_scatter
else:
return op_module.tabulate_fusion_se_atten(
tf.cast(self.table.data[net], self.filter_precision),
info,
xyz_scatter,
tf.reshape(inputs_i, [natom, shape_i[1] // 4, 4]),
two_embd,
last_layer_size=outputs_size[-1],
is_sorted=len(self.exclude_types) == 0,
)
if (not self.uniform_seed) and (self.seed is not None):
self.seed += self.seed_shift
input_r = tf.slice(
tf.reshape(inputs_i, (-1, shape_i[1] // 4, 4)), [0, 0, 1], [-1, -1, 3]
)
input_r = tf.nn.l2_normalize(input_r, -1)
# natom x nei_type_i x out_size
xyz_scatter_att = tf.reshape(
self._attention_layers(
xyz_scatter,
self.attn_layer,
shape_i,
outputs_size,
input_r,
dotr=self.attn_dotr,
do_mask=self.attn_mask,
trainable=trainable,
suffix=suffix,
),
(-1, shape_i[1] // 4, outputs_size[-1]),
)
# xyz_scatter = tf.reshape(xyz_scatter, (-1, shape_i[1] // 4, outputs_size[-1]))
else:
raise RuntimeError("this should not be touched")
# When using tf.reshape(inputs_i, [-1, shape_i[1]//4, 4]) below
# [588 24] -> [588 6 4] correct
# but if sel is zero
# [588 0] -> [147 0 4] incorrect; the correct one is [588 0 4]
# So we need to explicitly assign the shape to tf.shape(inputs_i)[0] instead of -1
return tf.matmul(
tf.reshape(inputs_i, [natom, shape_i[1] // 4, 4]),
xyz_scatter_att,
transpose_a=True,
)
@cast_precision
[docs]
def _filter(
self,
inputs,
type_input,
natoms,
type_embedding=None,
atype=None,
activation_fn=tf.nn.tanh,
stddev=1.0,
bavg=0.0,
suffix="",
name="linear",
reuse=None,
trainable=True,
):
nframes = tf.shape(tf.reshape(inputs, [-1, natoms[0], self.ndescrpt]))[0]
# natom x (nei x 4)
shape = inputs.get_shape().as_list()
outputs_size = [1, *self.filter_neuron]
outputs_size_2 = self.n_axis_neuron
start_index = 0
type_i = 0
# natom x 4 x outputs_size
xyz_scatter_1 = self._filter_lower(
type_i,
type_input,
start_index,
np.cumsum(self.sel_a)[-1],
inputs,
type_embedding=type_embedding,
is_exclude=False,
activation_fn=activation_fn,
stddev=stddev,
bavg=bavg,
trainable=trainable,
suffix=suffix,
name=name,
reuse=reuse,
atype=atype,
)
if nvnmd_cfg.enable:
return filter_GR2D(xyz_scatter_1)
# natom x nei x outputs_size
# xyz_scatter = tf.concat(xyz_scatter_total, axis=1)
# natom x nei x 4
# inputs_reshape = tf.reshape(inputs, [-1, shape[1]//4, 4])
# natom x 4 x outputs_size
# xyz_scatter_1 = tf.matmul(inputs_reshape, xyz_scatter, transpose_a = True)
if self.original_sel is None:
# shape[1] = nnei * 4
nnei = shape[1] / 4
else:
nnei = tf.cast(
tf.Variable(
np.sum(self.original_sel),
dtype=tf.int32,
trainable=False,
name="nnei",
),
self.filter_precision,
)
xyz_scatter_1 = xyz_scatter_1 / nnei
# natom x 4 x outputs_size_2
xyz_scatter_2 = tf.slice(xyz_scatter_1, [0, 0, 0], [-1, -1, outputs_size_2])
# # natom x 3 x outputs_size_2
# qmat = tf.slice(xyz_scatter_2, [0,1,0], [-1, 3, -1])
# natom x 3 x outputs_size_1
qmat = tf.slice(xyz_scatter_1, [0, 1, 0], [-1, 3, -1])
# natom x outputs_size_1 x 3
qmat = tf.transpose(qmat, perm=[0, 2, 1])
# natom x outputs_size x outputs_size_2
result = tf.matmul(xyz_scatter_1, xyz_scatter_2, transpose_a=True)
# natom x (outputs_size x outputs_size_2)
result = tf.reshape(result, [-1, outputs_size_2 * outputs_size[-1]])
return result, qmat
[docs]
def init_variables(
self,
graph: tf.Graph,
graph_def: tf.GraphDef,
suffix: str = "",
) -> None:
"""Init the embedding net variables with the given dict.
Parameters
----------
graph : tf.Graph
The input frozen model graph
graph_def : tf.GraphDef
The input frozen model graph_def
suffix : str, optional
The suffix of the scope
"""
super().init_variables(graph=graph, graph_def=graph_def, suffix=suffix)
self.attention_layer_variables = get_attention_layer_variables_from_graph_def(
graph_def, suffix=suffix
)
def compat_ln_pattern(old_key):
pattern = r"attention_layer_(\d+)/(layer_normalization)_\d+"
replacement = r"attention_layer_\1/\2"
if bool(re.search(pattern, old_key)):
new_key = re.sub(pattern, replacement, old_key)
v = self.attention_layer_variables.pop(old_key)
self.attention_layer_variables[new_key] = v
for item_key in list(self.attention_layer_variables.keys()):
compat_ln_pattern(item_key)
if self.stripped_type_embedding:
self.two_side_embeeding_net_variables = (
get_extra_embedding_net_variables_from_graph_def(
graph_def,
suffix,
get_extra_embedding_net_suffix(type_one_side=False),
)
)
[docs]
def build_type_exclude_mask_mixed(
self,
exclude_types: Set[Tuple[int, int]],
ntypes: int,
sel: List[int],
ndescrpt: int,
atype: tf.Tensor,
shape0: tf.Tensor,
nei_type_vec: tf.Tensor,
) -> tf.Tensor:
r"""Build the type exclude mask for the attention descriptor.
Notes
-----
This method has the similiar way to build the type exclude mask as
:meth:`deepmd.tf.descriptor.descriptor.Descriptor.build_type_exclude_mask`.
The mathmatical expression has been explained in that method.
The difference is that the attention descriptor has provided the type of
the neighbors (idx_j) that is not in order, so we use it from an extra
input.
Parameters
----------
exclude_types : List[Tuple[int, int]]
The list of excluded types, e.g. [(0, 1), (1, 0)] means the interaction
between type 0 and type 1 is excluded.
ntypes : int
The number of types.
sel : List[int]
The list of the number of selected neighbors for each type.
ndescrpt : int
The number of descriptors for each atom.
atype : tf.Tensor
The type of atoms, with the size of shape0.
shape0 : tf.Tensor
The shape of the first dimension of the inputs, which is equal to
nsamples * natoms.
nei_type_vec : tf.Tensor
The type of neighbors, with the size of (shape0, nnei).
Returns
-------
tf.Tensor
The type exclude mask, with the shape of (shape0, ndescrpt), and the
precision of GLOBAL_TF_FLOAT_PRECISION. The mask has the value of 1 if the
interaction between two types is not excluded, and 0 otherwise.
See Also
--------
deepmd.tf.descriptor.descriptor.Descriptor.build_type_exclude_mask
"""
# generate a mask
# op returns ntypes when the neighbor doesn't exist, so we need to add 1
type_mask = np.array(
[
[
1 if (tt_i, tt_j) not in exclude_types else 0
for tt_i in range(ntypes + 1)
]
for tt_j in range(ntypes)
],
dtype=bool,
)
type_mask = tf.convert_to_tensor(type_mask, dtype=GLOBAL_TF_FLOAT_PRECISION)
type_mask = tf.reshape(type_mask, [-1])
# (nsamples * natoms, 1)
atype_expand = tf.reshape(atype, [-1, 1])
# (nsamples * natoms, ndescrpt)
idx_i = tf.tile(atype_expand * (ntypes + 1), (1, ndescrpt))
# idx_j has been provided by atten op
# (nsamples * natoms, nnei, 1)
idx_j = tf.reshape(nei_type_vec, [shape0, sel[0], 1])
# (nsamples * natoms, nnei, ndescrpt // nnei)
idx_j = tf.tile(idx_j, (1, 1, ndescrpt // sel[0]))
# (nsamples * natoms, ndescrpt)
idx_j = tf.reshape(idx_j, [shape0, ndescrpt])
idx = idx_i + idx_j
idx = tf.reshape(idx, [-1])
mask = tf.nn.embedding_lookup(type_mask, idx)
# same as inputs_i, (nsamples * natoms, ndescrpt)
mask = tf.reshape(mask, [-1, ndescrpt])
return mask
@property
[docs]
def explicit_ntypes(self) -> bool:
"""Explicit ntypes with type embedding."""
return True
@classmethod
[docs]
def update_sel(cls, global_jdata: dict, local_jdata: dict):
"""Update the selection and perform neighbor statistics.
Parameters
----------
global_jdata : dict
The global data, containing the training section
local_jdata : dict
The local data refer to the current class
"""
local_jdata_cpy = local_jdata.copy()
return UpdateSel().update_one_sel(global_jdata, local_jdata_cpy, True)
[docs]
def serialize_attention_layers(
self,
nlayer: int,
nnei: int,
embed_dim: int,
hidden_dim: int,
dotr: bool,
do_mask: bool,
trainable_ln: bool,
ln_eps: float,
variables: dict,
bias: bool = True,
suffix: str = "",
) -> dict:
data = {
"layer_num": nlayer,
"nnei": nnei,
"embed_dim": embed_dim,
"hidden_dim": hidden_dim,
"dotr": dotr,
"do_mask": do_mask,
"trainable_ln": trainable_ln,
"ln_eps": ln_eps,
"precision": self.precision.name,
"attention_layers": [],
}
if suffix != "":
attention_layer_pattern = (
ATTENTION_LAYER_PATTERN.replace("/(c_query)", suffix + "/(c_query)")
.replace("/(c_key)", suffix + "/(c_key)")
.replace("/(c_value)", suffix + "/(c_value)")
.replace("/(c_out)", suffix + "/(c_out)")
.replace("/(layer_normalization)", suffix + "/(layer_normalization)")
)
else:
attention_layer_pattern = ATTENTION_LAYER_PATTERN
attention_layer_params = [{} for _ in range(nlayer)]
for key, value in variables.items():
m = re.search(attention_layer_pattern, key)
m = [mm for mm in m.groups() if mm is not None]
assert len(m) == 3
if m[1] not in attention_layer_params[int(m[0])]:
attention_layer_params[int(m[0])][m[1]] = {}
attention_layer_params[int(m[0])][m[1]][m[2]] = value
for layer_idx in range(nlayer):
in_proj = NativeLayer(
embed_dim,
hidden_dim * 3,
bias=bias,
use_timestep=False,
precision=self.precision.name,
)
matrix_list = [
attention_layer_params[layer_idx][key]["matrix"]
for key in ["c_query", "c_key", "c_value"]
]
in_proj["matrix"] = np.concatenate(matrix_list, axis=-1)
if bias:
bias_list = [
attention_layer_params[layer_idx][key]["bias"]
for key in ["c_query", "c_key", "c_value"]
]
in_proj["bias"] = np.concatenate(bias_list, axis=-1)
out_proj = NativeLayer(
hidden_dim,
embed_dim,
bias=bias,
use_timestep=False,
precision=self.precision.name,
)
out_proj["matrix"] = attention_layer_params[layer_idx]["c_out"]["matrix"]
if bias:
out_proj["bias"] = attention_layer_params[layer_idx]["c_out"]["bias"]
layer_norm = LayerNorm(
embed_dim,
trainable=self.trainable_ln,
eps=self.ln_eps,
precision=self.precision.name,
)
layer_norm["matrix"] = attention_layer_params[layer_idx][
"layer_normalization"
]["gamma"]
layer_norm["bias"] = attention_layer_params[layer_idx][
"layer_normalization"
]["beta"]
data["attention_layers"].append(
{
"attention_layer": {
"in_proj": in_proj.serialize(),
"out_proj": out_proj.serialize(),
"bias": bias,
"smooth": self.smooth,
},
"attn_layer_norm": layer_norm.serialize(),
"trainable_ln": self.trainable_ln,
"ln_eps": self.ln_eps,
}
)
return data
[docs]
def serialize_network_strip(
self,
ntypes: int,
ndim: int,
in_dim: int,
neuron: List[int],
activation_function: str,
resnet_dt: bool,
variables: dict,
suffix: str = "",
type_one_side: bool = False,
) -> dict:
"""Serialize network.
Parameters
----------
ntypes : int
The number of types
ndim : int
The dimension of elements
in_dim : int
The input dimension
neuron : List[int]
The neuron list
activation_function : str
The activation function
resnet_dt : bool
Whether to use resnet
variables : dict
The input variables
suffix : str, optional
The suffix of the scope
type_one_side : bool, optional
If 'False', type embeddings of both neighbor and central atoms are considered.
If 'True', only type embeddings of neighbor atoms are considered.
Default is 'False'.
Returns
-------
dict
The converted network data
"""
assert ndim == 0, "only supports descriptors with type embedding!"
embeddings = NetworkCollection(
ntypes=ntypes,
ndim=ndim,
network_type="embedding_network",
)
name_suffix = get_extra_embedding_net_suffix(type_one_side=type_one_side)
embedding_net_pattern_strip = str(
rf"filter_type_(all)/(matrix)_(\d+){name_suffix}|"
rf"filter_type_(all)/(bias)_(\d+){name_suffix}|"
rf"filter_type_(all)/(idt)_(\d+){name_suffix}|"
)[:-1]
if suffix != "":
embedding_net_pattern = (
embedding_net_pattern_strip.replace("/(idt)", suffix + "/(idt)")
.replace("/(bias)", suffix + "/(bias)")
.replace("/(matrix)", suffix + "/(matrix)")
)
else:
embedding_net_pattern = embedding_net_pattern_strip
for key, value in variables.items():
m = re.search(embedding_net_pattern, key)
m = [mm for mm in m.groups() if mm is not None]
layer_idx = int(m[2]) - 1
weight_name = m[1]
network_idx = ()
if embeddings[network_idx] is None:
# initialize the network if it is not initialized
embeddings[network_idx] = EmbeddingNet(
in_dim=in_dim,
neuron=neuron,
activation_function=activation_function,
resnet_dt=resnet_dt,
precision=self.precision.name,
)
assert embeddings[network_idx] is not None
if weight_name == "idt":
value = value.ravel()
embeddings[network_idx][layer_idx][weight_name] = value
return embeddings.serialize()
@classmethod
[docs]
def deserialize_attention_layers(cls, data: dict, suffix: str = "") -> dict:
"""Deserialize attention layers.
Parameters
----------
data : dict
The input attention layer data
suffix : str, optional
The suffix of the scope
Returns
-------
variables : dict
The input variables
"""
attention_layer_variables = {}
nlayer = data["layer_num"]
hidden_dim = data["hidden_dim"]
for layer_idx in range(nlayer):
in_proj = NativeLayer.deserialize(
data["attention_layers"][layer_idx]["attention_layer"]["in_proj"]
)
out_proj = NativeLayer.deserialize(
data["attention_layers"][layer_idx]["attention_layer"]["out_proj"]
)
layer_norm = LayerNorm.deserialize(
data["attention_layers"][layer_idx]["attn_layer_norm"]
)
# Deserialize in_proj
c_query_matrix = in_proj["matrix"][:, :hidden_dim]
c_key_matrix = in_proj["matrix"][:, hidden_dim : 2 * hidden_dim]
c_value_matrix = in_proj["matrix"][:, 2 * hidden_dim :]
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_query/matrix"
] = c_query_matrix
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_key/matrix"
] = c_key_matrix
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_value/matrix"
] = c_value_matrix
if data["attention_layers"][layer_idx]["attention_layer"]["bias"]:
c_query_bias = in_proj["bias"][:hidden_dim]
c_key_bias = in_proj["bias"][hidden_dim : 2 * hidden_dim]
c_value_bias = in_proj["bias"][2 * hidden_dim :]
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_query/bias"
] = c_query_bias
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_key/bias"
] = c_key_bias
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_value/bias"
] = c_value_bias
# Deserialize out_proj
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_out/matrix"
] = out_proj["matrix"]
if data["attention_layers"][layer_idx]["attention_layer"]["bias"]:
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/c_out/bias"
] = out_proj["bias"]
# Deserialize layer_norm
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/layer_normalization/beta"
] = layer_norm["bias"]
attention_layer_variables[
f"attention_layer_{layer_idx}{suffix}/layer_normalization/gamma"
] = layer_norm["matrix"]
return attention_layer_variables
@classmethod
[docs]
def deserialize_network_strip(
cls, data: dict, suffix: str = "", type_one_side: bool = False
) -> dict:
"""Deserialize network.
Parameters
----------
data : dict
The input network data
suffix : str, optional
The suffix of the scope
type_one_side : bool, optional
If 'False', type embeddings of both neighbor and central atoms are considered.
If 'True', only type embeddings of neighbor atoms are considered.
Default is 'False'.
Returns
-------
variables : dict
The input variables
"""
embedding_net_variables = {}
embeddings = NetworkCollection.deserialize(data)
assert embeddings.ndim == 0, "only supports descriptors with type embedding!"
name_suffix = get_extra_embedding_net_suffix(type_one_side=type_one_side)
net_idx = ()
network = embeddings[net_idx]
assert network is not None
for layer_idx, layer in enumerate(network.layers):
embedding_net_variables[
f"filter_type_all{suffix}/matrix_{layer_idx + 1}{name_suffix}"
] = layer.w
embedding_net_variables[
f"filter_type_all{suffix}/bias_{layer_idx + 1}{name_suffix}"
] = layer.b
if layer.idt is not None:
embedding_net_variables[
f"filter_type_all{suffix}/idt_{layer_idx + 1}{name_suffix}"
] = layer.idt.reshape(1, -1)
else:
# prevent keyError
embedding_net_variables[
f"filter_type_all{suffix}/idt_{layer_idx + 1}{name_suffix}"
] = 0.0
return embedding_net_variables
@classmethod
[docs]
def deserialize(cls, data: dict, suffix: str = ""):
"""Deserialize the model.
Parameters
----------
data : dict
The serialized data
Returns
-------
Model
The deserialized model
"""
if cls is not DescrptSeAtten:
raise NotImplementedError(f"Not implemented in class {cls.__name__}")
data = data.copy()
check_version_compatibility(data.pop("@version"), 1, 1)
data.pop("@class")
data.pop("type")
embedding_net_variables = cls.deserialize_network(
data.pop("embeddings"), suffix=suffix
)
attention_layer_variables = cls.deserialize_attention_layers(
data.pop("attention_layers"), suffix=suffix
)
data.pop("env_mat")
variables = data.pop("@variables")
tebd_input_mode = data["tebd_input_mode"]
if tebd_input_mode in ["strip"]:
raise ValueError(
"Deserialization is unsupported for `tebd_input_mode='strip'` in the native model."
)
descriptor = cls(**data)
descriptor.embedding_net_variables = embedding_net_variables
descriptor.attention_layer_variables = attention_layer_variables
descriptor.davg = variables["davg"].reshape(
descriptor.ntypes, descriptor.ndescrpt
)
descriptor.dstd = variables["dstd"].reshape(
descriptor.ntypes, descriptor.ndescrpt
)
return descriptor
[docs]
def serialize(self, suffix: str = "") -> dict:
"""Serialize the model.
Parameters
----------
suffix : str, optional
The suffix of the scope
Returns
-------
dict
The serialized data
"""
if type(self) not in [DescrptSeAtten, DescrptDPA1Compat]:
raise NotImplementedError(
f"Not implemented in class {self.__class__.__name__}"
)
if self.stripped_type_embedding and type(self) is not DescrptDPA1Compat:
# only DescrptDPA1Compat can serialize when tebd_input_mode=='strip'
raise NotImplementedError(
"serialization is unsupported by the native model when tebd_input_mode=='strip'"
)
# todo support serialization when tebd_input_mode=='strip' and type_one_side is True
if self.stripped_type_embedding and self.type_one_side:
raise NotImplementedError(
"serialization is unsupported when tebd_input_mode=='strip' and type_one_side is True"
)
if (self.original_sel != self.sel_a).any():
raise NotImplementedError(
"Adjusting sel is unsupported by the native model"
)
if self.embedding_net_variables is None:
raise RuntimeError("init_variables must be called before serialize")
if self.spin is not None:
raise NotImplementedError("spin is unsupported")
assert self.davg is not None
assert self.dstd is not None
data = {
"@class": "Descriptor",
"type": "se_atten",
"@version": 1,
"rcut": self.rcut_r,
"rcut_smth": self.rcut_r_smth,
"sel": self.sel_a,
"ntypes": self.ntypes,
"neuron": self.filter_neuron,
"axis_neuron": self.n_axis_neuron,
"set_davg_zero": self.set_davg_zero,
"attn": self.att_n,
"attn_layer": self.attn_layer,
"attn_dotr": self.attn_dotr,
"attn_mask": self.attn_mask,
"activation_function": self.activation_function_name,
"resnet_dt": self.filter_resnet_dt,
"smooth_type_embedding": self.smooth,
"tebd_input_mode": self.tebd_input_mode,
"trainable_ln": self.trainable_ln,
"ln_eps": self.ln_eps,
"precision": self.filter_precision.name,
"embeddings": self.serialize_network(
ntypes=self.ntypes,
ndim=0,
in_dim=1
if not hasattr(self, "embd_input_dim")
else self.embd_input_dim,
neuron=self.filter_neuron,
activation_function=self.activation_function_name,
resnet_dt=self.filter_resnet_dt,
variables=self.embedding_net_variables,
excluded_types=self.exclude_types,
suffix=suffix,
),
"attention_layers": self.serialize_attention_layers(
nlayer=self.attn_layer,
nnei=self.nnei_a,
embed_dim=self.filter_neuron[-1],
hidden_dim=self.att_n,
dotr=self.attn_dotr,
do_mask=self.attn_mask,
trainable_ln=self.trainable_ln,
ln_eps=self.ln_eps,
variables=self.attention_layer_variables,
suffix=suffix,
),
"env_mat": EnvMat(self.rcut_r, self.rcut_r_smth).serialize(),
"exclude_types": list(self.orig_exclude_types),
"env_protection": self.env_protection,
"@variables": {
"davg": self.davg.reshape(self.ntypes, self.nnei_a, 4),
"dstd": self.dstd.reshape(self.ntypes, self.nnei_a, 4),
},
"trainable": self.trainable,
"type_one_side": self.type_one_side,
"spin": self.spin,
}
if self.tebd_input_mode in ["strip"]:
assert (
type(self) is DescrptDPA1Compat
), "only DescrptDPA1Compat can serialize when tebd_input_mode=='strip'"
data.update(
{
"embeddings_strip": self.serialize_network_strip(
ntypes=self.ntypes,
ndim=0,
in_dim=2
* self.tebd_dim, # only DescrptDPA1Compat has this attribute
neuron=self.filter_neuron,
activation_function=self.activation_function_name,
resnet_dt=self.filter_resnet_dt,
variables=self.two_side_embeeding_net_variables,
suffix=suffix,
type_one_side=self.type_one_side,
)
}
)
return data
[docs]
class DescrptDPA1Compat(DescrptSeAtten):
r"""Consistent version of the model for testing with other backend references.
This model includes the type_embedding as attributes and other additional parameters.
Parameters
----------
rcut: float
The cut-off radius :math:`r_c`
rcut_smth: float
From where the environment matrix should be smoothed :math:`r_s`
sel: list[int], int
list[int]: sel[i] specifies the maxmum number of type i atoms in the cut-off radius
int: the total maxmum number of atoms in the cut-off radius
ntypes: int
Number of element types
neuron: list[int]
Number of neurons in each hidden layers of the embedding net :math:`\mathcal{N}`
axis_neuron: int
Number of the axis neuron :math:`M_2` (number of columns of the sub-matrix of the embedding matrix)
tebd_dim: int
Dimension of the type embedding
tebd_input_mode: str
The input mode of the type embedding. Supported modes are ["concat", "strip"].
- "concat": Concatenate the type embedding with the smoothed radial information as the union input for the embedding network.
- "strip": Use a separated embedding network for the type embedding and combine the output with the radial embedding network output.
resnet_dt: bool
Time-step `dt` in the resnet construction:
y = x + dt * \phi (Wx + b)
trainable: bool
If the weights of this descriptors are trainable.
trainable_ln: bool
Whether to use trainable shift and scale weights in layer normalization.
ln_eps: float, Optional
The epsilon value for layer normalization.
type_one_side: bool
If 'False', type embeddings of both neighbor and central atoms are considered.
If 'True', only type embeddings of neighbor atoms are considered.
Default is 'False'.
attn: int
Hidden dimension of the attention vectors
attn_layer: int
Number of attention layers
attn_dotr: bool
If dot the angular gate to the attention weights
attn_mask: bool
(Only support False to keep consistent with other backend references.)
If mask the diagonal of attention weights
exclude_types : List[List[int]]
The excluded pairs of types which have no interaction with each other.
For example, `[[0, 1]]` means no interaction between type 0 and type 1.
env_protection: float
Protection parameter to prevent division by zero errors during environment matrix calculations.
set_davg_zero: bool
Set the shift of embedding net input to zero.
activation_function: str
The activation function in the embedding net. Supported options are |ACTIVATION_FN|
precision: str
The precision of the embedding net parameters. Supported options are |PRECISION|
scaling_factor: float
(Only to keep consistent with other backend references.)
(Not used in this version.)
The scaling factor of normalization in calculations of attention weights.
If `temperature` is None, the scaling of attention weights is (N_dim * scaling_factor)**0.5
normalize: bool
(Only support True to keep consistent with other backend references.)
(Not used in this version.)
Whether to normalize the hidden vectors in attention weights calculation.
temperature: float
(Only support 1.0 to keep consistent with other backend references.)
(Not used in this version.)
If not None, the scaling of attention weights is `temperature` itself.
smooth_type_embedding: bool
(Only support False to keep consistent with other backend references.)
Whether to use smooth process in attention weights calculation.
concat_output_tebd: bool
Whether to concat type embedding at the output of the descriptor.
spin
(Only support None to keep consistent with old implementation.)
The old implementation of deepspin.
"""
def __init__(
self,
rcut: float,
rcut_smth: float,
sel: Union[List[int], int],
ntypes: int,
neuron: List[int] = [25, 50, 100],
axis_neuron: int = 8,
tebd_dim: int = 8,
tebd_input_mode: str = "concat",
resnet_dt: bool = False,
trainable: bool = True,
type_one_side: bool = True,
attn: int = 128,
attn_layer: int = 2,
attn_dotr: bool = True,
attn_mask: bool = False,
exclude_types: List[List[int]] = [],
env_protection: float = 0.0,
set_davg_zero: bool = False,
activation_function: str = "tanh",
precision: str = "default",
scaling_factor=1.0,
normalize: bool = True,
temperature: Optional[float] = None,
trainable_ln: bool = True,
ln_eps: Optional[float] = 1e-3,
smooth_type_embedding: bool = True,
concat_output_tebd: bool = True,
spin: Optional[Any] = None,
# consistent with argcheck, not used though
seed: Optional[int] = None,
uniform_seed: bool = False,
) -> None:
if not normalize:
raise NotImplementedError("Only support normalize == True in this version.")
if temperature != 1.0:
raise NotImplementedError(
"Only support temperature == 1.0 in this version."
)
if spin is not None:
raise NotImplementedError("Only support spin is None in this version.")
if attn_mask:
raise NotImplementedError(
"old implementation of attn_mask is not supported."
)
# to keep consistent with default value in this backends
if ln_eps is None:
ln_eps = 1e-3
super().__init__(
rcut,
rcut_smth,
sel,
ntypes,
neuron=neuron,
axis_neuron=axis_neuron,
resnet_dt=resnet_dt,
trainable=trainable,
seed=seed,
type_one_side=type_one_side,
set_davg_zero=set_davg_zero,
exclude_types=exclude_types,
activation_function=activation_function,
precision=precision,
uniform_seed=uniform_seed,
attn=attn,
attn_layer=attn_layer,
attn_dotr=attn_dotr,
attn_mask=attn_mask,
multi_task=True,
trainable_ln=trainable_ln,
ln_eps=ln_eps,
smooth_type_embedding=smooth_type_embedding,
tebd_input_mode=tebd_input_mode,
env_protection=env_protection,
)
self.tebd_dim = tebd_dim
self.scaling_factor = scaling_factor
self.normalize = normalize
self.temperature = temperature
self.type_embedding = TypeEmbedNet(
ntypes=self.ntypes,
neuron=[self.tebd_dim],
padding=True,
activation_function="Linear",
# precision=precision,
)
self.concat_output_tebd = concat_output_tebd
if self.tebd_input_mode in ["concat"]:
if not self.type_one_side:
self.embd_input_dim = 1 + self.tebd_dim * 2
else:
self.embd_input_dim = 1 + self.tebd_dim
else:
self.embd_input_dim = 1
[docs]
def build(
self,
coord_: tf.Tensor,
atype_: tf.Tensor,
natoms: tf.Tensor,
box_: tf.Tensor,
mesh: tf.Tensor,
input_dict: dict,
reuse: Optional[bool] = None,
suffix: str = "",
) -> tf.Tensor:
type_embedding = self.type_embedding.build(self.ntypes, suffix=suffix)
input_dict["type_embedding"] = type_embedding
# nf x nloc x out_dim
self.dout = super().build(
coord_,
atype_,
natoms,
box_,
mesh,
input_dict,
reuse=reuse,
suffix=suffix,
)
# self.dout = tf.cast(self.dout, self.filter_precision)
if self.concat_output_tebd:
atype = tf.reshape(atype_, [-1, natoms[1]])
atype_nloc = tf.reshape(
tf.slice(atype, [0, 0], [-1, natoms[0]]), [-1]
) ## lammps will have error without this
atom_embed = tf.reshape(
tf.nn.embedding_lookup(type_embedding, atype_nloc),
[-1, natoms[0], self.tebd_dim],
)
atom_embed = tf.cast(atom_embed, GLOBAL_TF_FLOAT_PRECISION)
# nf x nloc x (out_dim + tebd_dim)
self.dout = tf.concat([self.dout, atom_embed], axis=-1)
return self.dout
[docs]
def init_variables(
self,
graph: tf.Graph,
graph_def: tf.GraphDef,
suffix: str = "",
) -> None:
"""Init the embedding net variables with the given dict.
Parameters
----------
graph : tf.Graph
The input frozen model graph
graph_def : tf.GraphDef
The input frozen model graph_def
suffix : str, optional
The suffix of the scope
"""
super().init_variables(graph=graph, graph_def=graph_def, suffix=suffix)
self.type_embedding.init_variables(
graph=graph, graph_def=graph_def, suffix=suffix
)
[docs]
def update_attention_layers_serialize(self, data: dict):
"""Update the serialized data to be consistent with other backend references."""
new_dict = {
"@class": "NeighborGatedAttention",
"@version": 1,
"scaling_factor": self.scaling_factor,
"normalize": self.normalize,
"temperature": self.temperature,
}
new_dict.update(data)
update_info = {
"nnei": self.nnei_a,
"embed_dim": self.filter_neuron[-1],
"hidden_dim": self.att_n,
"dotr": self.attn_dotr,
"do_mask": self.attn_mask,
"scaling_factor": self.scaling_factor,
"normalize": self.normalize,
"temperature": self.temperature,
"precision": self.filter_precision.name,
}
for layer_idx in range(self.attn_layer):
new_dict["attention_layers"][layer_idx].update(update_info)
new_dict["attention_layers"][layer_idx]["attention_layer"].update(
update_info
)
return new_dict
@classmethod
[docs]
def deserialize(cls, data: dict, suffix: str = ""):
"""Deserialize the model.
Parameters
----------
data : dict
The serialized data
Returns
-------
Model
The deserialized model
"""
if cls is not DescrptDPA1Compat:
raise NotImplementedError(f"Not implemented in class {cls.__name__}")
data = data.copy()
check_version_compatibility(data.pop("@version"), 1, 1)
data.pop("@class")
data.pop("type")
embedding_net_variables = cls.deserialize_network(
data.pop("embeddings"), suffix=suffix
)
attention_layer_variables = cls.deserialize_attention_layers(
data.pop("attention_layers"), suffix=suffix
)
data.pop("env_mat")
variables = data.pop("@variables")
type_embedding = data.pop("type_embedding")
tebd_input_mode = data["tebd_input_mode"]
type_one_side = data["type_one_side"]
if tebd_input_mode in ["strip"]:
two_side_embeeding_net_variables = cls.deserialize_network_strip(
data.pop("embeddings_strip"),
suffix=suffix,
type_one_side=type_one_side,
)
else:
two_side_embeeding_net_variables = None
descriptor = cls(**data)
descriptor.embedding_net_variables = embedding_net_variables
descriptor.attention_layer_variables = attention_layer_variables
descriptor.two_side_embeeding_net_variables = two_side_embeeding_net_variables
descriptor.davg = variables["davg"].reshape(
descriptor.ntypes, descriptor.ndescrpt
)
descriptor.dstd = variables["dstd"].reshape(
descriptor.ntypes, descriptor.ndescrpt
)
descriptor.type_embedding = TypeEmbedNet.deserialize(
type_embedding, suffix=suffix
)
return descriptor
[docs]
def serialize(self, suffix: str = "") -> dict:
"""Serialize the model.
Parameters
----------
suffix : str, optional
The suffix of the scope
Returns
-------
dict
The serialized data
"""
data = super().serialize(suffix)
data.update(
{
"type": "dpa1",
"tebd_dim": self.tebd_dim,
"scaling_factor": self.scaling_factor,
"normalize": self.normalize,
"temperature": self.temperature,
"concat_output_tebd": self.concat_output_tebd,
"type_embedding": self.type_embedding.serialize(suffix),
}
)
data["attention_layers"] = self.update_attention_layers_serialize(
data["attention_layers"]
)
return data