Source code for deepmd.dpmodel.utils.network

# SPDX-License-Identifier: LGPL-3.0-or-later
"""Native DP model format for multiple backends.

See issue #2982 for more information.
"""

import copy
import itertools
from typing import (
    Callable,
    ClassVar,
    Dict,
    List,
    Optional,
    Union,
)

import numpy as np

from deepmd.utils.version import (
    check_version_compatibility,
)

try:
    from deepmd._version import version as __version__
except ImportError:
[docs] __version__ = "unknown"
from deepmd.dpmodel import ( DEFAULT_PRECISION, PRECISION_DICT, NativeOP, )
[docs] class NativeLayer(NativeOP): """Native representation of a layer. Parameters ---------- w : np.ndarray, optional The weights of the layer. b : np.ndarray, optional The biases of the layer. idt : np.ndarray, optional The identity matrix of the layer. activation_function : str, optional The activation function of the layer. resnet : bool, optional Whether the layer is a residual layer. """ def __init__( self, num_in, num_out, bias: bool = True, use_timestep: bool = False, activation_function: Optional[str] = None, resnet: bool = False, precision: str = DEFAULT_PRECISION, ) -> None: prec = PRECISION_DICT[precision.lower()] self.precision = precision # only use_timestep when skip connection is established. use_timestep = use_timestep and (num_out == num_in or num_out == num_in * 2) rng = np.random.default_rng() self.w = rng.normal(size=(num_in, num_out)).astype(prec) self.b = rng.normal(size=(num_out,)).astype(prec) if bias else None self.idt = rng.normal(size=(num_out,)).astype(prec) if use_timestep else None self.activation_function = ( activation_function if activation_function is not None else "none" ) self.resnet = resnet self.check_type_consistency() self.check_shape_consistency()
[docs] def serialize(self) -> dict: """Serialize the layer to a dict. Returns ------- dict The serialized layer. """ data = { "w": self.w, "b": self.b, "idt": self.idt, } return { "@class": "Layer", "@version": 1, "bias": self.b is not None, "use_timestep": self.idt is not None, "activation_function": self.activation_function, "resnet": self.resnet, # make deterministic "precision": np.dtype(PRECISION_DICT[self.precision]).name, "@variables": data, }
@classmethod
[docs] def deserialize(cls, data: dict) -> "NativeLayer": """Deserialize the layer from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = copy.deepcopy(data) check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) variables = data.pop("@variables") assert variables["w"] is not None and len(variables["w"].shape) == 2 num_in, num_out = variables["w"].shape obj = cls( num_in, num_out, **data, ) obj.w, obj.b, obj.idt = ( variables["w"], variables.get("b", None), variables.get("idt", None), ) if obj.b is not None: obj.b = obj.b.ravel() if obj.idt is not None: obj.idt = obj.idt.ravel() obj.check_shape_consistency() return obj
[docs] def check_shape_consistency(self): if self.b is not None and self.w.shape[1] != self.b.shape[0]: raise ValueError( f"dim 1 of w {self.w.shape[1]} is not equal to shape " f"of b {self.b.shape[0]}", ) if self.idt is not None and self.w.shape[1] != self.idt.shape[0]: raise ValueError( f"dim 1 of w {self.w.shape[1]} is not equal to shape " f"of idt {self.idt.shape[0]}", )
[docs] def check_type_consistency(self): precision = self.precision def check_var(var): if var is not None: # assertion "float64" == "double" would fail assert PRECISION_DICT[var.dtype.name] is PRECISION_DICT[precision] check_var(self.w) check_var(self.b) check_var(self.idt)
[docs] def __setitem__(self, key, value): if key in ("w", "matrix"): self.w = value elif key in ("b", "bias"): self.b = value elif key == "idt": self.idt = value elif key == "activation_function": self.activation_function = value elif key == "resnet": self.resnet = value elif key == "precision": self.precision = value else: raise KeyError(key)
[docs] def __getitem__(self, key): if key in ("w", "matrix"): return self.w elif key in ("b", "bias"): return self.b elif key == "idt": return self.idt elif key == "activation_function": return self.activation_function elif key == "resnet": return self.resnet elif key == "precision": return self.precision else: raise KeyError(key)
[docs] def dim_in(self) -> int: return self.w.shape[0]
[docs] def dim_out(self) -> int: return self.w.shape[1]
[docs] def call(self, x: np.ndarray) -> np.ndarray: """Forward pass. Parameters ---------- x : np.ndarray The input. Returns ------- np.ndarray The output. """ if self.w is None or self.activation_function is None: raise ValueError("w, b, and activation_function must be set") fn = get_activation_fn(self.activation_function) y = ( np.matmul(x, self.w) + self.b if self.b is not None else np.matmul(x, self.w) ) y = fn(y) if self.idt is not None: y *= self.idt if self.resnet and self.w.shape[1] == self.w.shape[0]: y += x elif self.resnet and self.w.shape[1] == 2 * self.w.shape[0]: y += np.concatenate([x, x], axis=-1) return y
[docs] def get_activation_fn(activation_function: str) -> Callable[[np.ndarray], np.ndarray]: activation_function = activation_function.lower() if activation_function == "tanh": return np.tanh elif activation_function == "relu": def fn(x): # https://stackoverflow.com/a/47936476/9567349 return x * (x > 0) return fn elif activation_function in ("gelu", "gelu_tf"): def fn(x): # generated by GitHub Copilot return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3))) return fn elif activation_function == "relu6": def fn(x): # generated by GitHub Copilot return np.minimum(np.maximum(x, 0), 6) return fn elif activation_function == "softplus": def fn(x): # generated by GitHub Copilot return np.log(1 + np.exp(x)) return fn elif activation_function == "sigmoid": def fn(x): # generated by GitHub Copilot return 1 / (1 + np.exp(-x)) return fn elif activation_function.lower() in ("none", "linear"): def fn(x): return x return fn else: raise NotImplementedError(activation_function)
[docs] class LayerNorm(NativeLayer): """Implementation of Layer Normalization layer. Parameters ---------- num_in : int The input dimension of the layer. eps : float, optional A small value added to prevent division by zero in calculations. uni_init : bool, optional If initialize the weights to be zeros and ones. """ def __init__( self, num_in: int, eps: float = 1e-5, uni_init: bool = True, trainable: bool = True, precision: str = DEFAULT_PRECISION, ) -> None: self.eps = eps self.uni_init = uni_init self.num_in = num_in super().__init__( num_in=1, num_out=num_in, bias=True, use_timestep=False, activation_function=None, resnet=False, precision=precision, ) self.w = self.w.squeeze(0) # keep the weight shape to be [num_in] if self.uni_init: self.w = np.ones_like(self.w) self.b = np.zeros_like(self.b) # only to keep consistent with other backends self.trainable = trainable
[docs] def serialize(self) -> dict: """Serialize the layer to a dict. Returns ------- dict The serialized layer. """ data = { "w": self.w, "b": self.b, } return { "@class": "LayerNorm", "@version": 1, "eps": self.eps, "trainable": self.trainable, "precision": self.precision, "@variables": data, }
@classmethod
[docs] def deserialize(cls, data: dict) -> "LayerNorm": """Deserialize the layer from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = copy.deepcopy(data) check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) variables = data.pop("@variables") if variables["w"] is not None: assert len(variables["w"].shape) == 1 if variables["b"] is not None: assert len(variables["b"].shape) == 1 (num_in,) = variables["w"].shape obj = cls( num_in, **data, ) (obj.w,) = (variables["w"],) (obj.b,) = (variables["b"],) obj._check_shape_consistency() return obj
[docs] def _check_shape_consistency(self): if self.b is not None and self.w.shape[0] != self.b.shape[0]: raise ValueError( f"dim 1 of w {self.w.shape[0]} is not equal to shape " f"of b {self.b.shape[0]}", )
[docs] def __setitem__(self, key, value): if key in ("w", "matrix"): self.w = value elif key in ("b", "bias"): self.b = value elif key == "trainable": self.trainable = value elif key == "precision": self.precision = value elif key == "eps": self.eps = value else: raise KeyError(key)
[docs] def __getitem__(self, key): if key in ("w", "matrix"): return self.w elif key in ("b", "bias"): return self.b elif key == "trainable": return self.trainable elif key == "precision": return self.precision elif key == "eps": return self.eps else: raise KeyError(key)
[docs] def dim_out(self) -> int: return self.w.shape[0]
[docs] def call(self, x: np.ndarray) -> np.ndarray: """Forward pass. Parameters ---------- x : np.ndarray The input. Returns ------- np.ndarray The output. """ y = self.layer_norm_numpy(x, (self.num_in,), self.w, self.b, self.eps) return y
@staticmethod
[docs] def layer_norm_numpy(x, shape, weight=None, bias=None, eps=1e-5): # mean and variance mean = np.mean(x, axis=tuple(range(-len(shape), 0)), keepdims=True) var = np.var(x, axis=tuple(range(-len(shape), 0)), keepdims=True) # normalize x_normalized = (x - mean) / np.sqrt(var + eps) # shift and scale if weight is not None and bias is not None: x_normalized = x_normalized * weight + bias return x_normalized
[docs] def make_multilayer_network(T_NetworkLayer, ModuleBase): class NN(ModuleBase): """Native representation of a neural network. Parameters ---------- layers : list[NativeLayer], optional The layers of the network. """ def __init__(self, layers: Optional[List[dict]] = None) -> None: super().__init__() if layers is None: layers = [] self.layers = [T_NetworkLayer.deserialize(layer) for layer in layers] self.check_shape_consistency() def serialize(self) -> dict: """Serialize the network to a dict. Returns ------- dict The serialized network. """ return { "@class": "NN", "@version": 1, "layers": [layer.serialize() for layer in self.layers], } @classmethod def deserialize(cls, data: dict) -> "NN": """Deserialize the network from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = data.copy() check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) return cls(data["layers"]) def __getitem__(self, key): assert isinstance(key, int) return self.layers[key] def __setitem__(self, key, value): assert isinstance(key, int) self.layers[key] = value def check_shape_consistency(self): for ii in range(len(self.layers) - 1): if self.layers[ii].dim_out() != self.layers[ii + 1].dim_in(): raise ValueError( f"the dim of layer {ii} output {self.layers[ii].dim_out} ", f"does not match the dim of layer {ii+1} ", f"output {self.layers[ii].dim_out}", ) def call(self, x): """Forward pass. Parameters ---------- x : np.ndarray The input. Returns ------- np.ndarray The output. """ for layer in self.layers: x = layer(x) return x def clear(self): """Clear the network parameters to zero.""" for layer in self.layers: layer.w.fill(0.0) if layer.b is not None: layer.b.fill(0.0) if layer.idt is not None: layer.idt.fill(0.0) return NN
[docs] NativeNet = make_multilayer_network(NativeLayer, NativeOP)
[docs] def make_embedding_network(T_Network, T_NetworkLayer): class EN(T_Network): """The embedding network. Parameters ---------- in_dim Input dimension. neuron The number of neurons in each layer. The output dimension is the same as the dimension of the last layer. activation_function The activation function. resnet_dt Use time step at the resnet architecture. precision Floating point precision for the model paramters. """ def __init__( self, in_dim, neuron: List[int] = [24, 48, 96], activation_function: str = "tanh", resnet_dt: bool = False, precision: str = DEFAULT_PRECISION, ): layers = [] i_in = in_dim for idx, ii in enumerate(neuron): i_ot = ii layers.append( T_NetworkLayer( i_in, i_ot, bias=True, use_timestep=resnet_dt, activation_function=activation_function, resnet=True, precision=precision, ).serialize() ) i_in = i_ot super().__init__(layers) self.in_dim = in_dim self.neuron = neuron self.activation_function = activation_function self.resnet_dt = resnet_dt self.precision = precision def serialize(self) -> dict: """Serialize the network to a dict. Returns ------- dict The serialized network. """ return { "@class": "EmbeddingNetwork", "@version": 1, "in_dim": self.in_dim, "neuron": self.neuron.copy(), "activation_function": self.activation_function, "resnet_dt": self.resnet_dt, # make deterministic "precision": np.dtype(PRECISION_DICT[self.precision]).name, "layers": [layer.serialize() for layer in self.layers], } @classmethod def deserialize(cls, data: dict) -> "EmbeddingNet": """Deserialize the network from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = copy.deepcopy(data) check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) layers = data.pop("layers") obj = cls(**data) super(EN, obj).__init__(layers) return obj return EN
[docs] EmbeddingNet = make_embedding_network(NativeNet, NativeLayer)
[docs] def make_fitting_network(T_EmbeddingNet, T_Network, T_NetworkLayer): class FN(T_EmbeddingNet): """The fitting network. It may be implemented as an embedding net connected with a linear output layer. Parameters ---------- in_dim Input dimension. out_dim Output dimension neuron The number of neurons in each hidden layer. activation_function The activation function. resnet_dt Use time step at the resnet architecture. precision Floating point precision for the model paramters. bias_out The last linear layer has bias. """ def __init__( self, in_dim, out_dim, neuron: List[int] = [24, 48, 96], activation_function: str = "tanh", resnet_dt: bool = False, precision: str = DEFAULT_PRECISION, bias_out: bool = True, ): super().__init__( in_dim, neuron=neuron, activation_function=activation_function, resnet_dt=resnet_dt, precision=precision, ) i_in = neuron[-1] if len(neuron) > 0 else in_dim i_ot = out_dim self.layers.append( T_NetworkLayer( i_in, i_ot, bias=bias_out, use_timestep=False, activation_function=None, resnet=False, precision=precision, ) ) self.out_dim = out_dim self.bias_out = bias_out def serialize(self) -> dict: """Serialize the network to a dict. Returns ------- dict The serialized network. """ return { "@class": "FittingNetwork", "@version": 1, "in_dim": self.in_dim, "out_dim": self.out_dim, "neuron": self.neuron.copy(), "activation_function": self.activation_function, "resnet_dt": self.resnet_dt, "precision": self.precision, "bias_out": self.bias_out, "layers": [layer.serialize() for layer in self.layers], } @classmethod def deserialize(cls, data: dict) -> "FittingNet": """Deserialize the network from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = copy.deepcopy(data) check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) layers = data.pop("layers") obj = cls(**data) T_Network.__init__(obj, layers) return obj return FN
[docs] FittingNet = make_fitting_network(EmbeddingNet, NativeNet, NativeLayer)
[docs] class NetworkCollection: """A collection of networks for multiple elements. The number of dimesions for types might be 0, 1, or 2. - 0: embedding or fitting with type embedding, in () - 1: embedding with type_one_side, or fitting, in (type_i) - 2: embedding without type_one_side, in (type_i, type_j) Parameters ---------- ndim : int The number of dimensions. network_type : str, optional The type of the network. networks : dict, optional The networks to initialize with. """ # subclass may override this
[docs] NETWORK_TYPE_MAP: ClassVar[Dict[str, type]] = { "network": NativeNet, "embedding_network": EmbeddingNet, "fitting_network": FittingNet, }
def __init__( self, ndim: int, ntypes: int, network_type: str = "network", networks: List[Union[NativeNet, dict]] = [], ): self.ndim = ndim self.ntypes = ntypes self.network_type = self.NETWORK_TYPE_MAP[network_type] self._networks = [None for ii in range(ntypes**ndim)] for ii, network in enumerate(networks): self[ii] = network if len(networks): self.check_completeness()
[docs] def check_completeness(self): """Check whether the collection is complete. Raises ------ RuntimeError If the collection is incomplete. """ for tt in itertools.product(range(self.ntypes), repeat=self.ndim): if self[tuple(tt)] is None: raise RuntimeError(f"network for {tt} not found")
[docs] def _convert_key(self, key): if isinstance(key, int): idx = key else: if isinstance(key, tuple): pass elif isinstance(key, str): key = tuple([int(tt) for tt in key.split("_")[1:]]) else: raise TypeError(key) assert isinstance(key, tuple) assert len(key) == self.ndim idx = sum([tt * self.ntypes**ii for ii, tt in enumerate(key)]) return idx
[docs] def __getitem__(self, key): return self._networks[self._convert_key(key)]
[docs] def __setitem__(self, key, value): if isinstance(value, self.network_type): pass elif isinstance(value, dict): value = self.network_type.deserialize(value) else: raise TypeError(value) self._networks[self._convert_key(key)] = value
[docs] def serialize(self) -> dict: """Serialize the networks to a dict. Returns ------- dict The serialized networks. """ network_type_map_inv = {v: k for k, v in self.NETWORK_TYPE_MAP.items()} network_type_name = network_type_map_inv[self.network_type] return { "@class": "NetworkCollection", "@version": 1, "ndim": self.ndim, "ntypes": self.ntypes, "network_type": network_type_name, "networks": [nn.serialize() for nn in self._networks], }
@classmethod
[docs] def deserialize(cls, data: dict) -> "NetworkCollection": """Deserialize the networks from a dict. Parameters ---------- data : dict The dict to deserialize from. """ data = data.copy() check_version_compatibility(data.pop("@version", 1), 1, 1) data.pop("@class", None) return cls(**data)