Source code for deepmd.tf.utils.neighbor_stat

# SPDX-License-Identifier: LGPL-3.0-or-later
import logging
from typing import (
    Iterator,
    Optional,
    Tuple,
)

import numpy as np

from deepmd.tf.env import (
    GLOBAL_NP_FLOAT_PRECISION,
    GLOBAL_TF_FLOAT_PRECISION,
    default_tf_session_config,
    tf,
)
from deepmd.tf.utils.batch_size import (
    AutoBatchSize,
)
from deepmd.tf.utils.data_system import (
    DeepmdDataSystem,
)
from deepmd.tf.utils.nlist import (
    extend_coord_with_ghosts,
)
from deepmd.tf.utils.sess import (
    run_sess,
)
from deepmd.utils.neighbor_stat import NeighborStat as BaseNeighborStat

[docs] log = logging.getLogger(__name__)
[docs] class NeighborStatOP: """Class for getting neighbor statics data information. Parameters ---------- ntypes The num of atom types rcut The cut-off radius mixed_types : bool, optional If True, treat neighbors of all types as a single type. """ def __init__( self, ntypes: int, rcut: float, mixed_types: bool, ) -> None: super().__init__() self.rcut = rcut self.ntypes = ntypes self.mixed_types = mixed_types
[docs] def build( self, coord: tf.Tensor, atype: tf.Tensor, cell: tf.Tensor, pbc: tf.Tensor, ) -> Tuple[tf.Tensor, tf.Tensor]: """Calculate the nearest neighbor distance between atoms, maximum nbor size of atoms and the output data range of the environment matrix. Parameters ---------- coord The coordinates of atoms. atype The atom types. cell The cell. Returns ------- tf.Tensor The minimal squared distance between two atoms, in the shape of (nframes,) tf.Tensor The maximal number of neighbors """ # generated by GitHub Copilot, converted from PT codes nframes = tf.shape(coord)[0] coord = tf.reshape(coord, [nframes, -1, 3]) nloc = tf.shape(coord)[1] coord = tf.reshape(coord, [nframes, nloc * 3]) extend_coord, extend_atype, _ = extend_coord_with_ghosts( coord, atype, cell, self.rcut, pbc ) coord1 = tf.reshape(extend_coord, [nframes, -1]) nall = tf.shape(coord1)[1] // 3 coord0 = coord1[:, : nloc * 3] diff = ( tf.reshape(coord1, [nframes, -1, 3])[:, None, :, :] - tf.reshape(coord0, [nframes, -1, 3])[:, :, None, :] ) # shape of diff: nframes, nloc, nall, 3 # remove the diagonal elements mask = tf.eye(nloc, nall, dtype=tf.bool) # expand mask mask = tf.tile(mask[None, :, :], [nframes, 1, 1]) # expand inf inf_mask = tf.constant( float("inf"), dtype=GLOBAL_TF_FLOAT_PRECISION, shape=[1, 1, 1] ) inf_mask = tf.tile(inf_mask, [nframes, nloc, nall]) # virtual type (<0) are not counted virtual_type_mask_i = tf.tile(tf.less(atype, 0)[:, :, None], [1, 1, nall]) virtual_type_mask_j = tf.tile( tf.less(extend_atype, 0)[:, None, :], [1, nloc, 1] ) mask = mask | virtual_type_mask_i | virtual_type_mask_j rr2 = tf.reduce_sum(tf.square(diff), axis=-1) rr2 = tf.where(mask, inf_mask, rr2) min_rr2 = tf.reduce_min(rr2, axis=(1, 2)) # count the number of neighbors if not self.mixed_types: mask = rr2 < self.rcut**2 nnei = [] for ii in range(self.ntypes): nnei.append( tf.reduce_sum( tf.cast( mask & (tf.equal(extend_atype, ii))[:, None, :], tf.int32 ), axis=-1, ) ) # shape: nframes, nloc, ntypes nnei = tf.stack(nnei, axis=-1) else: mask = rr2 < self.rcut**2 # virtual types (<0) are not counted nnei = tf.reshape( tf.reduce_sum( tf.cast( mask & tf.greater_equal(extend_atype, 0)[:, None, :], tf.int32 ), axis=-1, ), [nframes, nloc, 1], ) # nnei: nframes, nloc, ntypes # virtual type i (<0) are not counted nnei = tf.where( tf.tile( tf.less(atype, 0)[:, :, None], [1, 1, self.ntypes if not self.mixed_types else 1], ), tf.zeros_like(nnei, dtype=tf.int32), nnei, ) max_nnei = tf.reduce_max(nnei, axis=1) return min_rr2, max_nnei
[docs] class NeighborStat(BaseNeighborStat): """Class for getting training data information. It loads data from DeepmdData object, and measures the data info, including neareest nbor distance between atoms, max nbor size of atoms and the output data range of the environment matrix. Parameters ---------- ntypes The num of atom types rcut The cut-off radius mixed_type : bool, optional, default=False Treat all types as a single type. """ def __init__( self, ntypes: int, rcut: float, mixed_type: bool = False, ) -> None: """Constructor.""" super().__init__(ntypes, rcut, mixed_type) self.auto_batch_size = AutoBatchSize() self.neighbor_stat = NeighborStatOP(ntypes, rcut, mixed_type) self.place_holders = {} with tf.Graph().as_default() as sub_graph: self.op = self.build() self.sub_sess = tf.Session(graph=sub_graph, config=default_tf_session_config)
[docs] def build(self) -> Tuple[tf.Tensor, tf.Tensor]: """Build the graph. Returns ------- tf.Tensor The minimal squared distance between two atoms, in the shape of (nframes,) tf.Tensor The maximal number of neighbors """ for ii in ["coord", "box"]: self.place_holders[ii] = tf.placeholder( GLOBAL_NP_FLOAT_PRECISION, [None, None], name="t_" + ii ) self.place_holders["type"] = tf.placeholder( tf.int32, [None, None], name="t_type" ) self.place_holders["pbc"] = tf.placeholder(tf.bool, [], name="t_pbc") ret = self.neighbor_stat.build( self.place_holders["coord"], self.place_holders["type"], self.place_holders["box"], self.place_holders["pbc"], ) return ret
[docs] def iterator( self, data: DeepmdDataSystem ) -> Iterator[Tuple[np.ndarray, float, str]]: """Produce data. Parameters ---------- data The data system Yields ------ np.ndarray The maximal number of neighbors float The squared minimal distance between two atoms str The directory of the data system """ for ii in range(len(data.system_dirs)): for jj in data.data_systems[ii].dirs: data_set = data.data_systems[ii] data_set_data = data_set._load_set(jj) minrr2, max_nnei = self.auto_batch_size.execute_all( self._execute, data_set_data["coord"].shape[0], data_set.get_natoms(), data_set_data["coord"], data_set_data["type"], data_set_data["box"], data_set.pbc, ) yield np.max(max_nnei, axis=0), np.min(minrr2), jj
[docs] def _execute( self, coord: np.ndarray, atype: np.ndarray, box: Optional[np.ndarray], pbc: bool, ): """Execute the operation. Parameters ---------- coord The coordinates of atoms. atype The atom types. box The box. pbc Whether the box is periodic. """ feed_dict = { self.place_holders["coord"]: coord, self.place_holders["type"]: atype, self.place_holders["box"]: box, self.place_holders["pbc"]: pbc, } minrr2, max_nnei = run_sess(self.sub_sess, self.op, feed_dict=feed_dict) return minrr2, max_nnei