Source code for CAT.attachment.distribution

"""Functions for creating distributions of atomic indices (*i.e.* core anchor atoms).

Index
-----
.. currentmodule:: CAT.distribution
.. autosummary::
    uniform_idx
    distribute_idx

API
---
.. autofunction:: uniform_idx
.. autofunction:: distribute_idx

"""

import sys
import reprlib
import functools
from types import MappingProxyType
from itertools import islice, takewhile
from collections import abc
from typing import (
    cast,
    Generator,
    Optional,
    Iterable,
    FrozenSet,
    Any,
    Union,
    Callable,
    Mapping,
    TYPE_CHECKING,
)

import numpy as np
from scipy.spatial.distance import cdist
from nanoutils import as_nd_array

from CAT.utils import cycle_accumulate
from CAT.attachment.edge_distance import edge_dist

__all__ = ['distribute_idx', 'uniform_idx']

if TYPE_CHECKING:
    from numpy.typing import ArrayLike

    if sys.version_info >= (3, 8):
        from typing import Literal
    else:
        from typing_extensions import Literal

    _OpKind = Literal['min', 'max']
    _ModeKind = Literal['uniform', 'random', 'cluster']
else:
    ArrayLike = "numpy.typing.ArrayLike"
    _OpKind = "Literal['min', 'max']"
    _ModeKind = "Literal['uniform', 'random', 'cluster']"

#: A set of allowed values for the **mode** parameter in :func:`get_distribution`.
MODE_SET: FrozenSet[str] = frozenset({'uniform', 'random', 'cluster'})

#: Map the **operation** parameter in :func:`uniform_idx` to either
#: :func:`numpy.nanargmin` or :func:`numpy.nanargmax`.
_ArgFunc = Callable[[np.ndarray], np.intp]
OPERATION_MAPPING: Mapping[Union[str, Callable], _ArgFunc] = MappingProxyType({
    'min': np.nanargmin,
    min: np.nanargmin,
    np.min: np.nanargmin,
    'max': np.nanargmax,
    max: np.nanargmax,
    np.max: np.nanargmax,
})


[docs]def distribute_idx( core: ArrayLike, idx: Union[int, Iterable[int]], f: float, mode: _ModeKind = 'uniform', **kwargs: Any, ) -> np.ndarray: r"""Create a new distribution of atomic indices from **idx** of length :code:`f * len(idx)`. Parameters ---------- core : array-like [:class:`float`], shape :math:`(m, 3)` A 2D array-like object (such as a :class:`Molecule` instance) consisting of Cartesian coordinates. idx : :class:`int` or :class:`Iterable<collections.abc.Iterable>` [:class:`int`], shape :math:`(i,)` An integer or iterable of unique integers representing the 0-based indices of all anchor atoms in **core**. f : :class:`float` A float obeying the following condition: :math:`0.0 < f \le 1.0`. Represents the fraction of **idx** that will be returned. mode : :class:`str` How the subset of to-be returned indices will be generated. Accepts one of the following values: * ``"random"``: A random distribution. * ``"uniform"``: A uniform distribution; the distance between each successive atom and all previous points is maximized. * ``"cluster"``: A clustered distribution; the distance between each successive atom and all previous points is minmized. \**kwargs : :data:`Any<typing.Any>` Further keyword arguments for the **mode**-specific functions. Returns ------- :class:`numpy.ndarray` [:class:`int`], shape :math:`(f*i,)` A 1D array of atomic indices. If **idx** has :math:`i` elements, then the length of the returned list is equal to :math:`\max(1, f*i)`. See Also -------- :func:`uniform_idx` Yield the column-indices of **dist** which yield a uniform or clustered distribution. :func:`cluster_idx` Return the column-indices of **dist** which yield a clustered distribution. """ # noqa # Convert **idx** into an array idx_ar = as_nd_array(idx, dtype=int) # Validate the input if mode not in MODE_SET: raise ValueError(f"Invalid `mode`: {mode!r}") elif not (0.0 < f <= 1.0): raise ValueError("'f' should be larger than 0.0 and smaller than or equal to 1.0") # Create an array of indices stop = max(1, int(round(f * len(idx_ar)))) if mode in ('uniform', 'cluster'): xyz = np.array(core, dtype=np.float64, ndmin=2, copy=False)[idx_ar] dist = edge_dist(xyz) if kwargs.get('follow_edge', False) else cdist(xyz, xyz) operation: _OpKind = 'min' if mode == 'uniform' else 'max' generator1 = uniform_idx( dist, operation=operation, start=kwargs.get('start', None), cluster_size=kwargs.get('cluster_size', 1), randomness=kwargs.get('randomness', None), weight=kwargs.get('weight', lambda x: np.exp(-x)) ) generator2 = islice(generator1, stop) ret = idx_ar[np.fromiter(generator2, count=stop, dtype=np.intp)] elif mode == 'random': ret = np.random.permutation(idx) # Return a list of `p * len(idx)` atomic indices return ret[:stop]
[docs]def uniform_idx( dist: np.ndarray, operation: _OpKind = 'min', cluster_size: Union[int, Iterable[int]] = 1, start: Optional[int] = None, randomness: Optional[float] = None, weight: Callable[[np.ndarray], np.ndarray] = lambda x: np.exp(-x) ) -> Generator[int, None, None]: r"""Yield the column-indices of **dist** which yield a uniform or clustered distribution. Given the (symmetric) distance matrix :math:`\boldsymbol{D} \in \mathbb{R}^{n,n}` and the vector :math:`\boldsymbol{a} \in \mathbb{N}^{m}` (representing a subset of indices in :math:`D`), then the :math:`i`'th element :math:`a_{i}` is defined below. All elements of :math:`\boldsymbol{a}` are furthermore constrained to be unique. :math:`f(x)` is herein a, as of yet unspecified, function for weighting each individual distance. Following the convention used in python, the :math:`\boldsymbol{X}[0:3, 1:5]` notation is herein used to denote the submatrix created by intersecting rows :math:`0` up to (but not including) :math:`3` and columns :math:`1` up to (but not including) :math:`5`. .. math:: \DeclareMathOperator*{\argmin}{\arg\!\min} a_{i} = \begin{cases} \argmin\limits_{k \in \mathbb{N}} \sum f \bigl( \boldsymbol{D}_{k,:} \bigr) & \text{if} & i=0 \\ \argmin\limits_{k \in \mathbb{N}} \sum f \bigl( \boldsymbol{D}[k, \boldsymbol{a}[0:i]] \bigr) & \text{if} & i > 0 \end{cases} Default weighting function: :math:`f(x) = e^{-x}`. The row in :math:`D` corresponding to :math:`a_{0}` can alternatively be specified by **start**. The :math:`\text{argmin}` operation can be exchanged for :math:`\text{argmax}` by setting **operation** to ``"max"``, thus yielding a clustered- rather than uniform-distribution. The **cluster_size** parameter allows for the creation of uniformly distributed clusters of size :math:`r`. Herein the vector of indices, :math:`\boldsymbol{a} \in \mathbb{N}^{m}` is for the purpose of book keeping reshaped into the matrix :math:`\boldsymbol{A} \in \mathbb{N}^{q, r} \; \text{with} \; q*r = m`. All elements of :math:`\boldsymbol{A}` are, again, constrained to be unique. .. math:: \DeclareMathOperator*{\argmin}{\arg\!\min} A_{i,j} = \begin{cases} \argmin\limits_{k \in \mathbb{N}} \sum f \bigl( \boldsymbol{D}_{k,:} \bigr) & \text{if} & i=0; \; j=0 \\ \argmin\limits_{k \in \mathbb{N}} \sum f \bigl( \boldsymbol{D}[k; \boldsymbol{A}[0:i, 0:r] \bigl) & \text{if} & i > 0; \; j = 0 \\ \argmin\limits_{k \in \mathbb{N}} \dfrac{\sum f \bigl( \boldsymbol{D}[k, \boldsymbol{A}[0:i, 0:r] \bigr)} {\sum f \bigl( \boldsymbol{D}[k, \boldsymbol{A}[i, 0:j] \bigr)} & \text{if} & j > 0 \end{cases} Examples -------- .. code-block:: python >>> import numpy as np >>> from CAT.distribution import uniform_idx >>> dist: np.ndarray = np.random.rand(10, 10) >>> out1 = uniform_idx(dist) >>> idx_ar1 = np.fromiter(out1, dtype=np.intp) >>> out2 = uniform_idx(dist, operation="min") >>> out3 = uniform_idx(dist, cluster_size=5) >>> out4 = uniform_idx(dist, cluster_size=[1, 1, 1, 1, 2, 2, 4]) >>> out5 = uniform_idx(dist, start=5) >>> out6 = uniform_idx(dist, randomness=0.75) >>> out7 = uniform_idx(dist, weight=lambda x: x**-1) Parameters ---------- dist : :class:`numpy.ndarray` [:class:`float`], shape :math:`(n, n)` A symmetric 2D NumPy array (:math:`D_{i,j} = D_{j,i}`) representing the distance matrix :math:`D`. operation : :class:`str` Whether to use :func:`~numpy.argmin` or :func:`~numpy.argmax`. Accepted values are ``"min"`` and ``"max"``. cluster_size : :class:`int` or :class:`Iterable<collections.abc.Iterable>` [:class:`int`] An integer or iterable of integers representing the size of clusters. Used in conjunction with :code:`operation = "max"` for creating a uniform distribution of clusters. :code:`cluster_size = 1` is equivalent to a normal uniform distribution. Providing **cluster_size** as an iterable of integers will create clusters of varying, user-specified, sizes. For example, :code:`cluster_size = range(1, 4)` will continuesly create clusters of sizes 1, 2 and 3. The iteration process is repeated until all atoms represented by **dist** are exhausted. start : :class:`int`, optional The index of the starting row in **dist**. If ``None``, start in whichever row contains the global minimum (:math:`\DeclareMathOperator*{\argmin}{\arg\!\min} \argmin\limits_{k \in \mathbb{N}} ||\boldsymbol{D}_{k, :}||_{p}`) or maximum (:math:`\DeclareMathOperator*{\argmax}{\arg\!\max} \argmax\limits_{k \in \mathbb{N}} ||\boldsymbol{D}_{k, :}||_{p}`). See **operation**. randomness : :class:`float`, optional If not ``None``, represents the probability that a random index will be yielded rather than obeying **operation**. Should obey the following condition: :math:`0 \le randomness \le 1`. weight : :data:`Callable<typing.Callable>` A callable for applying weights to the distance; default: :math:`e^{-x}`. The callable should take an array as argument and return a new array, *e.g.* :func:`numpy.exp`. Yields ------ :class:`int` Yield the column-indices specified in :math:`\boldsymbol{d}`. """ # noqa # Truncate and weight the distance matrix dist_w = np.array(dist, dtype=float, copy=True) if not np.diagonal(dist_w).any(): np.fill_diagonal(dist_w, np.nan) dist_w = weight(dist_w) # Use either argmin or argmax try: arg_func = OPERATION_MAPPING[operation] except KeyError: raise ValueError(f"Invalid `operation`: {operation!r}") from None if start is None: start = cast(int, arg_func(np.nansum(dist_w, axis=1))) if randomness is not None: arg_func = _parse_randomness(randomness, arg_func, len(dist)) # Yield the first index try: dist_w_1d: np.ndarray = dist_w[start].copy() except IndexError: if not hasattr(start, '__index__'): raise TypeError("'start' expected an integer or 'None'; observed type: " f"'{start.__class__.__name__}'") from None raise ValueError("'start' is out of bounds") from None dist_w_1d[start] = np.nan yield start # Return a generator for yielding the remaining indices if cluster_size == 1: generator = _min_or_max(dist_w, dist_w_1d, arg_func) else: generator = _min_and_max(dist_w, dist_w_1d, arg_func, cluster_size) for i in generator: yield i
def _min_or_max(dist_sqr: np.ndarray, dist_1d_sqr: np.ndarray, arg_func: Callable[[np.ndarray], int]) -> Generator[int, None, None]: """Helper function for :func:`uniform_idx` if :code:`cluster_size == 1`.""" for _ in range(len(dist_1d_sqr)-1): i = arg_func(dist_1d_sqr) dist_1d_sqr[i] = np.nan dist_1d_sqr += dist_sqr[i] yield i def _min_and_max(dist_sqr: np.ndarray, dist_1d_sqr: np.ndarray, arg_func: Callable[[np.ndarray], int], cluster_size: Union[int, Iterable[int]] = 1) -> Generator[int, None, None]: """Helper function for :func:`uniform_idx` if :code:`cluster_size != 1`.""" # Construct a boolean array denoting the start of new clusters bool_ar = np.zeros(len(dist_1d_sqr), dtype=bool) if isinstance(cluster_size, abc.Iterable): bool_indices = _parse_cluster_size(len(bool_ar), cluster_size) bool_ar[bool_indices] = True else: try: bool_ar[::cluster_size] = True except ValueError as ex: raise ValueError("'cluster_size' cannot be zero; oberved value: " f"{reprlib.repr(cluster_size)}") from ex except TypeError as ex: raise TypeError("'cluster_size' expected a non-zero integer or iterable of integers; " f"observed type: '{cluster_size.__class__.__name__}'") from ex bool_ar = bool_ar[1:] # Skip the first element as it was already yielded in uniform_idx() dist_cluster = dist_1d_sqr.copy() for bool_ in bool_ar: if bool_: # The start of a new cluster dist_1d_sqr += dist_cluster dist_cluster[:] = 0 dist_1d = dist_1d_sqr else: # The growth of a cluster dist_1d = dist_1d_sqr / dist_cluster j = arg_func(dist_1d) dist_1d_sqr[j] = np.nan dist_cluster += dist_sqr[j] yield j def _random_arg_func(dist_1d: np.ndarray, arg_func: Callable[[np.ndarray], int], threshold: float, idx: np.ndarray, rand_func: Callable[[], float] = np.random.sample) -> int: """Return a random element from **idx** if :code:`threshold > rand_func()`, otherwise call :code:`arg_func(dist_1d)`. Elements in **dist_1d** which are `nan` will be used for masking **idx**. """ # noqa if threshold > rand_func(): return np.random.choice(idx[~np.isnan(dist_1d)]) return arg_func(dist_1d) def _parse_cluster_size(ar_size: int, clusters: Iterable[int]) -> np.ndarray: """Return indices for all ``True`` values in the boolean array of :func:`_min_and_max`.""" generator = takewhile(lambda x: x < ar_size, cycle_accumulate(clusters)) try: return np.fromiter(generator, dtype=int) except TypeError as ex: raise TypeError("'cluster_size' expected a non-zero integer or iterable of integers; " f"{ex}") from ex def _parse_randomness(randomness: float, arg_func: Callable[[np.ndarray], int], n: int) -> Callable[[np.ndarray], int]: """Modifiy **arg_func** such that there is a **randomness** chance to return a random index from the range **n**.""" # noqa try: assert (0 <= randomness <= 1) except TypeError as ex: raise TypeError("'randomness' expected a float larger than 0.0 and smaller than 1.0; " f"observed type: '{randomness.__class__.__name__}'") from ex except AssertionError as ex: raise ValueError("'randomness' expected a float larger than 0.0 and smaller than 1.0; " f"observed value: {reprlib.repr(randomness)}") from ex return functools.partial(_random_arg_func, arg_func=arg_func, threshold=randomness, idx=np.arange(n))