Source code for gpaw.core.atom_arrays

from __future__ import annotations

import numbers
from typing import Sequence, overload, Literal

import numpy as np
from gpaw.core.matrix import Matrix
from gpaw.gpu import cupy as cp, XP
from gpaw.mpi import MPIComm, serial_comm
from gpaw.new import prod, zips
from gpaw.typing import Array1D, ArrayLike1D
from gpaw.new.c import dH_aii_times_P_ani_gpu
from gpaw.utilities import as_real_dtype


[docs] class AtomArraysLayout(XP): def __init__(self, shapes: Sequence[int | tuple[int, ...]], atomdist: AtomDistribution | MPIComm = serial_comm, dtype=float, xp=None): """Description of layout of atom arrays. Parameters ---------- shapes: Shapse of arrays - one for each atom. atomdist: Distribution of atoms. dtype: Data-type (float or complex). """ self.shape_a = [shape if isinstance(shape, tuple) else (shape,) for shape in shapes] if not isinstance(atomdist, AtomDistribution): atomdist = AtomDistribution(np.zeros(len(shapes), int), atomdist) self.atomdist = atomdist self.dtype = np.dtype(dtype) XP.__init__(self, xp or np) self.size = sum(prod(shape) for shape in self.shape_a) self.myindices = [] self.mysize = 0 I1 = 0 for a in atomdist.indices: I2 = I1 + prod(self.shape_a[a]) self.myindices.append((a, I1, I2)) self.mysize += I2 - I1 I1 = I2 def __len__(self): return len(self.shape_a) def __repr__(self): return (f'AtomArraysLayout({self.shape_a}, {self.atomdist}, ' f'{self.dtype}, xp={self.xp.__name__})')
[docs] def new(self, atomdist=None, dtype=None, xp=None): """Create new AtomsArrayLayout object with new atomdist.""" return AtomArraysLayout(self.shape_a, atomdist or self.atomdist, dtype or self.dtype, xp or self.xp)
[docs] def empty(self, dims: int | tuple[int, ...] = (), comm: MPIComm = serial_comm) -> AtomArrays: """Create new AtomArrays object. parameters ---------- dims: Extra dimensions. comm: Distribute dimensions along this communicator. """ return AtomArrays(self, dims, comm)
[docs] def zeros(self, dims: int | tuple[int, ...] = (), comm: MPIComm = serial_comm) -> AtomArrays: aa = self.empty(dims, comm) aa.data[:] = 0.0 return aa
[docs] def sizes(self) -> tuple[list[dict[int, int]], Array1D]: """Compute array sizes for all ranks. >>> AtomArraysLayout([3, 4]).sizes() ([{0: 3, 1: 4}], array([7])) """ comm = self.atomdist.comm size_ra: list[dict[int, int]] = [{} for _ in range(comm.size)] size_r = np.zeros(comm.size, int) for a, (rank, shape) in enumerate(zips(self.atomdist.rank_a, self.shape_a)): size = prod(shape) size_ra[rank][a] = size size_r[rank] += size return size_ra, size_r
[docs] class AtomDistribution: def __init__(self, ranks: ArrayLike1D, comm: MPIComm = serial_comm): """Atom-distribution. Parameters ---------- ranks: List of ranks, one rank per atom. comm: MPI-communicator. """ self.comm = comm self.rank_a = np.array(ranks) # convert from np.int64 -> int: self.indices = [int(a) for a in np.where(self.rank_a == comm.rank)[0]] def __len__(self) -> int: return len(self.rank_a)
[docs] @classmethod def from_number_of_atoms(cls, natoms: int, comm: MPIComm = serial_comm) -> AtomDistribution: """Distribute atoms evenly. >>> AtomDistribution.from_number_of_atoms(3).rank_a array([0, 0, 0]) """ blocksize = (natoms + comm.size - 1) // comm.size rank_a = np.empty(natoms, int) a1 = 0 for rank in range(comm.size): a2 = a1 + blocksize rank_a[a1:a2] = rank if a2 >= natoms: break a1 = a2 return cls(rank_a, comm)
[docs] @classmethod def from_atom_indices(cls, atom_indices: Sequence[int], comm: MPIComm = serial_comm, *, natoms: int | None = None) -> AtomDistribution: """Create distribution from atom indices. >>> AtomDistribution.from_atom_indices([0, 1, 2]).rank_a array([0, 0, 0]) """ if natoms is None: natoms = comm.max_scalar(max(atom_indices)) + 1 rank_a = np.zeros(natoms, int) # type: ignore rank_a[atom_indices] = comm.rank comm.sum(rank_a) return cls(rank_a, comm)
def __repr__(self): return (f'AtomDistribution(ranks={self.rank_a}, ' f'comm={self.comm.rank}/{self.comm.size})')
[docs] def gather(self): return AtomDistribution(np.zeros(len(self.rank_a), int))
[docs] class AtomArrays: def __init__(self, layout: AtomArraysLayout, dims: int | Sequence[int] = (), comm: MPIComm = serial_comm, data: np.ndarray | None = None): """AtomArrays object. parameters ---------- layout: Layout-description. dims: Extra dimensions. comm: Distribute dimensions along this communicator. data: Data array for storage. """ myshape = (layout.mysize,) domain_comm = layout.atomdist.comm dtype = layout.dtype self.myshape = myshape self.comm = comm self.domain_comm = domain_comm # convert int to tuple: self.dims = tuple(dims) if not isinstance(dims, int) else (dims,) if self.dims: d1, d2 = self.my_slice() mydims0 = d2 - d1 self.mydims = (mydims0,) + self.dims[1:] else: self.mydims = () fullshape = self.mydims + self.myshape if data is not None: if data.shape != fullshape: raise ValueError( f'Bad shape for data: {data.shape} != {fullshape}') if data.dtype != dtype: raise ValueError( f'Bad dtype for data: {data.dtype} != {dtype}') else: data = layout.xp.empty(fullshape, dtype) self.data = data self._matrix: Matrix | None = None # matrix view self.layout = layout self._arrays = {} for a, I1, I2 in layout.myindices: self._arrays[a] = self.data[..., I1:I2].reshape( self.mydims + layout.shape_a[a]) def __len__(self) -> int: return len(self.layout)
[docs] def my_slice(self) -> tuple[int, int]: mydims0 = (self.dims[0] + self.comm.size - 1) // self.comm.size d1 = min(self.comm.rank * mydims0, self.dims[0]) d2 = min((self.comm.rank + 1) * mydims0, self.dims[0]) return d1, d2
def __repr__(self): txt = f'AtomArrays({self.layout}, dims={self.dims}' if self.comm.size > 1: txt += f', comm={self.comm.rank}/{self.comm.size}' return txt + ')' @property def matrix(self) -> Matrix: if self._matrix is not None: return self._matrix shape = (self.dims[0], prod(self.dims[1:]) * prod(self.myshape)) myshape = (self.mydims[0], prod(self.mydims[1:]) * prod(self.myshape)) dist = (self.comm, -1, 1) data = self.data.reshape(myshape) self._matrix = Matrix(*shape, data=data, dist=dist) return self._matrix
[docs] def new(self, *, layout=None, data=None, xp=None): """Create new AtomArrays object of same kind. Parameters ---------- layout: Layout-description. data: Array to use for storage. """ if xp is np: assert layout is None assert data is None assert self.layout.xp is cp layout = self.layout.new(xp=np) return AtomArrays(layout or self.layout, self.dims, self.comm, data=data)
[docs] def to_cpu(self): if self.layout.xp is np: return self return self.new(layout=self.layout.new(xp=np), data=cp.asnumpy(self.data))
[docs] def to_xp(self, xp): if self.layout.xp is xp: assert xp is np, 'cp -> cp should not be needed!' return self if xp is np: return self.new(layout=self.layout.new(xp=np), data=cp.asnumpy(self.data)) return self.new(layout=self.layout.new(xp=cp), data=cp.asarray(self.data))
@overload def __getitem__(self, a: int) -> np.ndarray: ... @overload def __getitem__(self, a: tuple) -> AtomArrays: ... def __getitem__(self, a): if isinstance(a, numbers.Integral): return self._arrays[a] assert len(self.dims) >= 1 a0, a1 = a assert a0 == slice(None) data = self.data[a1] a_ai = AtomArrays(self.layout, dims=data.shape[:-1], data=data) return a_ai
[docs] def copy(self): return self.new(data=self.data.copy())
[docs] def get(self, a): return self._arrays.get(a)
def __setitem__(self, a, value): self._arrays[a][:] = value def __contains__(self, a): return a in self._arrays
[docs] def items(self): return self._arrays.items()
[docs] def keys(self): return self._arrays.keys()
[docs] def values(self): return self._arrays.values()
@overload def gather(self, broadcast: Literal[False] = False, copy: bool = False ) -> AtomArrays | None: ... @overload def gather(self, broadcast: Literal[True], copy: bool = False ) -> AtomArrays: ...
[docs] def gather(self, broadcast=False, copy=False): """Gather all atoms on master.""" comm = self.layout.atomdist.comm if comm.size == 1: if copy: aa = self.new() aa.data[:] = self.data return aa return self if comm.rank == 0 or broadcast: aa = self.new(layout=self.layout.new(atomdist=serial_comm)) else: aa = None if comm.rank == 0: size_ra, size_r = self.layout.sizes() n = prod(self.mydims) m = size_r.max() buffer = self.layout.xp.empty(n * m, self.layout.dtype) for rank in range(1, comm.size): buf = buffer[:n * size_r[rank]].reshape((n, size_r[rank])) comm.receive(buf, rank) b1 = 0 for a, size in size_ra[rank].items(): b2 = b1 + size A = aa[a] A[:] = buf[:, b1:b2].reshape(A.shape) b1 = b2 for a, array in self._arrays.items(): aa[a] = array else: comm.send(self.data, 0) if broadcast: comm.broadcast(aa.data, 0) return aa
[docs] def scatter_from(self, data: np.ndarray | AtomArrays | None = None) -> None: """Scatter atoms.""" if isinstance(data, AtomArrays): data = data.data comm = self.layout.atomdist.comm xp = self.layout.xp if comm.size == 1: assert data is not None self.data[:] = data return if comm.rank != 0: comm.receive(self.data, 0, 42) return size_ra, size_r = self.layout.sizes() aa = self.new(layout=self.layout.new(atomdist=serial_comm), data=data) requests = [] for rank, (totsize, size_a) in enumerate(zips(size_r, size_ra)): if rank != 0: buf = xp.empty(self.mydims + (totsize,), self.layout.dtype) b1 = 0 for a, size in size_a.items(): b2 = b1 + size buf[..., b1:b2] = aa[a].reshape(self.mydims + (size,)) b1 = b2 request = comm.send(buf, rank, 42, False) # Remember to store a reference to the # send buffer (buf) so that is isn't # deallocated requests.append((request, buf)) else: for a in size_a: self[a] = aa[a] for request, _ in requests: comm.wait(request)
[docs] def to_lower_triangle(self): """Convert `N*N` matrices to `N*(N+1)/2` vectors. >>> a = AtomArraysLayout([(3, 3)]).empty() >>> a[0][:] = [[11, 12, 13], ... [12, 22, 23], ... [13, 23, 33]] >>> a.to_lower_triangle()[0] array([11., 12., 22., 13., 23., 33.]) """ shape_a = [] for i1, i2 in self.layout.shape_a: assert i1 == i2 shape_a.append((i1 * (i1 + 1) // 2,)) xp = self.layout.xp layout = AtomArraysLayout(shape_a, self.layout.atomdist, dtype=self.layout.dtype, xp=xp) a_axp = layout.empty(self.dims) for a_xii, a_xp in zips(self.values(), a_axp.values()): i = a_xii.shape[-1] L = xp.tril_indices(i) for a_p, a_ii in zips(a_xp.reshape((-1, i * (i + 1) // 2)), a_xii.reshape((-1, i, i))): a_p[:] = a_ii[L] return a_axp
[docs] def to_full(self): r"""Convert `N(N+1)/2` vectors to `N\times N` matrices. >>> a = AtomArraysLayout([6]).empty() >>> a[0][:] = [1, 2, 3, 4, 5, 6] >>> a.to_full()[0] array([[1., 2., 4.], [2., 3., 5.], [4., 5., 6.]]) """ shape_a = [] for (p,) in self.layout.shape_a: i = int((2 * p + 0.25)**0.5) shape_a.append((i, i)) layout = AtomArraysLayout(shape_a, self.layout.atomdist, self.layout.dtype) a_axii = layout.empty(self.dims) for a_xp, a_xii in zips(self.values(), a_axii.values()): i = a_xii.shape[-1] a_xii[(...,) + np.tril_indices(i)] = a_xp u = (...,) + np.triu_indices(i, 1) a_xii[u] = np.swapaxes(a_xii, -1, -2)[u].conj() return a_axii
[docs] def moved(self, atomdist): if (self.layout.atomdist.rank_a == atomdist.rank_a).all(): return self assert self.comm.size == 1 layout = self.layout.new(atomdist=atomdist) new = layout.empty(self.dims) comm = atomdist.comm xp = self.layout.xp requests = [] for a, I1, I2 in self.layout.myindices: r = layout.atomdist.rank_a[a] if r == comm.rank: new[a][:] = self[a] else: requests.append(comm.send(xp.ascontiguousarray(self[a]), r, block=False)) for a, I1, I2 in layout.myindices: r = self.layout.atomdist.rank_a[a] if r != comm.rank: target = new[a] buf = xp.empty_like(target) comm.receive(buf, r) target[:] = buf comm.waitall(requests) return new
[docs] def redist(self, atomdist: AtomDistribution, comm1: MPIComm, comm2: MPIComm) -> AtomArrays: layout = self.layout.new(atomdist=atomdist) result = layout.empty(dims=self.dims) if comm1.rank == 0: a = self.gather() else: a = None if comm2.rank == 0: result.scatter_from(a) comm2.broadcast(result.data, 0) return result
[docs] def block_diag_multiply(self, block_diag_matrix_axii: AtomArrays, out_ani: AtomArrays, index: int | None = None) -> None: """Multiply by block diagonal matrix. with A, B and C refering to ``self``, ``block_diag_matrix_axii`` and ``out_ani``::: -- a a a > A B -> C -- ni ij nj i If index is not None, ``block_diag_matrix_axii`` must have an extra dimension: :math:`B_{ij}^{ax}` and x=index is used. """ xp = self.layout.xp if xp is np: if index is not None: block_diag_matrix_axii = block_diag_matrix_axii[:, index] for P_ni, dX_ii, out_ni in zips(self.values(), block_diag_matrix_axii.values(), out_ani.values()): out_ni[:] = P_ni @ dX_ii return ni_a = xp.array( [I2 - I1 for a, I1, I2 in self.layout.myindices], dtype=np.int32) data = block_diag_matrix_axii.data if index is not None: data = data[index] if self.data.size > 0: realdtype = as_real_dtype(self.data.dtype) dH_aii_times_P_ani_gpu(xp.asarray(data, dtype=realdtype), ni_a, self.data, out_ani.data)