# Copyright (C) 2003 CAMP
# Please see the accompanying LICENSE file for further information.
"""Grid-descriptors
This module contains a classes defining uniform 3D grids.
For radial grid descriptors, look atom/radialgd.py.
"""
import numbers
from math import pi
from typing import Sequence
from numpy import lcm
from fractions import Fraction
import numpy as np
from scipy.ndimage import map_coordinates
import gpaw.cgpaw as cgpaw
import gpaw.mpi as mpi
from gpaw.domain import Domain
from gpaw.new import prod
from gpaw.typing import Array1D, Array3D, Vector
from gpaw.utilities.blas import mmm, r2k, rk
NONBLOCKING = False
class GridBoundsError(ValueError):
pass
class BadGridError(ValueError):
pass
[docs]
class GridDescriptor(Domain):
r"""Descriptor-class for uniform 3D grid
A ``GridDescriptor`` object holds information on how functions, such
as wave functions and electron densities, are discreticed in a
certain domain in space. The main information here is how many
grid points are used in each direction of the unit cell.
There are methods for tasks such as allocating arrays, performing
symmetry operations and integrating functions over space. All
methods work correctly also when the domain is parallelized via
domain decomposition.
This is how a 2x2x2 3D array is laid out in memory::
3-----7
|\ |\
| \ | \
| 1-----5 z
2--|--6 | y |
\ | \ | \ |
\| \| \|
0-----4 +-----x
Example:
>>> a = np.zeros((2, 2, 2))
>>> a.ravel()[:] = range(8)
>>> a
array([[[0., 1.],
[2., 3.]],
<BLANKLINE>
[[4., 5.],
[6., 7.]]])
"""
ndim = 3 # dimension of ndarrays
def __init__(self, N_c, cell_cv=[1, 1, 1], pbc_c=True,
comm=None, parsize_c=None, allow_empty_domains=False):
"""Construct grid-descriptor object.
parameters:
N_c: 3 ints
Number of grid points along axes.
cell_cv: 3 float's or 3x3 floats
Unit cell.
pbc_c: one or three bools
Periodic boundary conditions flag(s).
comm: MPI-communicator
Communicator for domain-decomposition.
parsize_c: tuple of 3 ints, a single int or None
Number of domains.
allow_empty_domains: bool
Allow parallelization that would generate empty domains.
Note that if pbc_c[c] is False, then the actual number of gridpoints
along axis c is one less than N_c[c].
Attributes:
========== ========================================================
``dv`` Volume per grid point.
``h_cv`` Array of the grid spacing along the three axes.
``N_c`` Array of the number of grid points along the three axes.
``n_c`` Number of grid points on this CPU.
``beg_c`` Beginning of grid-point indices (inclusive).
``end_c`` End of grid-point indices (exclusive).
``comm`` MPI-communicator for domain decomposition.
========== ========================================================
The length unit is Bohr.
"""
if isinstance(pbc_c, int):
pbc_c = (pbc_c,) * 3
if comm is None:
comm = mpi.world
self.N_c = np.array(N_c, int)
if (self.N_c != N_c).any():
raise ValueError('Non-int number of grid points %s' % N_c)
Domain.__init__(self, cell_cv, pbc_c, comm, parsize_c, self.N_c)
self.rank = self.comm.rank
self.beg_c = np.empty(3, int)
self.end_c = np.empty(3, int)
self.n_cp = []
for c in range(3):
n_p = (np.arange(self.parsize_c[c] + 1) * float(self.N_c[c]) /
self.parsize_c[c])
n_p = np.around(n_p + 0.4999).astype(int)
if not self.pbc_c[c]:
n_p[0] = 1
if np.any(n_p[1:] == n_p[:-1]):
if allow_empty_domains:
# If there are empty domains, sort them to the end
n_p[:] = (np.arange(self.parsize_c[c] + 1) +
1 - self.pbc_c[c]).clip(0, self.N_c[c])
else:
msg = ('Grid {} too small for {} cores!'
.format('x'.join(str(n) for n in self.N_c),
'x'.join(str(n) for n in self.parsize_c)))
raise BadGridError(msg)
self.beg_c[c] = n_p[self.parpos_c[c]]
self.end_c[c] = n_p[self.parpos_c[c] + 1]
self.n_cp.append(n_p)
self.n_c = self.end_c - self.beg_c
self.h_cv = self.cell_cv / self.N_c[:, np.newaxis]
self.volume = abs(np.linalg.det(self.cell_cv))
self.dv = self.volume / self.N_c.prod()
self.orthogonal = not (self.cell_cv -
np.diag(self.cell_cv.diagonal())).any()
def __repr__(self):
if self.orthogonal:
cellstring = np.diag(self.cell_cv).tolist()
else:
cellstring = self.cell_cv.tolist()
pcoords = tuple(self.get_processor_position_from_rank())
return ('GridDescriptor(%s, cell_cv=%s, pbc_c=%s, comm=[%d/%d, '
'domain=%s], parsize=%s)'
% (self.N_c.tolist(), cellstring,
np.array(self.pbc_c).astype(int).tolist(), self.comm.rank,
self.comm.size, pcoords, self.parsize_c.tolist()))
[docs]
def new_descriptor(self, N_c=None, cell_cv=None, pbc_c=None,
comm=None, parsize_c=None, allow_empty_domains=False):
"""Create new descriptor based on this one.
The new descriptor will use the same class (possibly a subclass)
and all arguments will be equal to those of this descriptor
unless new arguments are provided."""
if N_c is None:
N_c = self.N_c
if cell_cv is None:
cell_cv = self.cell_cv
if pbc_c is None:
pbc_c = self.pbc_c
if comm is None:
comm = self.comm
if parsize_c is None and comm.size == self.comm.size:
parsize_c = self.parsize_c
return self.__class__(N_c, cell_cv, pbc_c, comm, parsize_c,
allow_empty_domains)
[docs]
def coords(self, c, pad=True):
"""Return coordinates along one of the three axes.
Useful for plotting::
import matplotlib.pyplot as plt
plt.plot(gd.coords(0), data[:, 0, 0])
plt.show()
"""
L = np.linalg.norm(self.cell_cv[c])
N = self.N_c[c]
h = L / N
p = self.pbc_c[c] or pad
return np.linspace((1 - p) * h, L, N - 1 + p, False)
def get_grid_spacings(self):
L_c = (np.linalg.inv(self.cell_cv)**2).sum(0)**-0.5
return L_c / self.N_c
def get_size_of_global_array(self, pad=False):
if pad:
return self.N_c
else:
return self.N_c - 1 + self.pbc_c
def flat_index(self, G_c):
g1, g2, g3 = G_c - self.beg_c
return g3 + self.n_c[2] * (g2 + g1 * self.n_c[1])
def get_slice(self):
return [slice(b - 1 + p, e - 1 + p) for b, e, p in
zip(self.beg_c, self.end_c, self.pbc_c)]
[docs]
def zeros(self, n=(), dtype=float, global_array=False, pad=False, xp=np):
"""Return new zeroed 3D array for this domain.
The type can be set with the ``dtype`` keyword (default:
``float``). Extra dimensions can be added with ``n=dim``. A
global array spanning all domains can be allocated with
``global_array=True``."""
return self._new_array(n, dtype, True, global_array, pad, xp)
[docs]
def empty(self, n=(), dtype=float, global_array=False, pad=False, xp=np):
"""Return new uninitialized 3D array for this domain.
The type can be set with the ``dtype`` keyword (default:
``float``). Extra dimensions can be added with ``n=dim``. A
global array spanning all domains can be allocated with
``global_array=True``."""
return self._new_array(n, dtype, False, global_array, pad, xp)
def _new_array(self, n=(), dtype=float, zero=True,
global_array=False, pad=False, xp=np):
if global_array:
shape = self.get_size_of_global_array(pad)
else:
shape = self.n_c
if isinstance(n, numbers.Integral):
n = (n,)
shape = n + tuple(shape)
if zero:
return xp.zeros(shape, dtype)
else:
return xp.empty(shape, dtype)
def get_axial_communicator(self, axis):
peer_ranks = []
pos_c = self.parpos_c.copy()
for i in range(self.parsize_c[axis]):
pos_c[axis] = i
peer_ranks.append(self.get_rank_from_processor_position(pos_c))
peer_comm = self.comm.new_communicator(peer_ranks)
return peer_comm
[docs]
def integrate(self, a_xg, b_yg=None,
global_integral=True, hermitian=False):
"""Integrate function(s) over domain.
a_xg: ndarray
Function(s) to be integrated.
b_yg: ndarray
If present, integrate a_xg.conj() * b_yg.
global_integral: bool
If the array(s) are distributed over several domains, then the
total sum will be returned. To get the local contribution
only, use global_integral=False.
hermitian: bool
Result is hermitian.
"""
xshape = a_xg.shape[:-3]
if b_yg is None:
# Only one array:
result = a_xg.reshape(xshape + (-1,)).sum(axis=-1) * self.dv
if global_integral:
if result.ndim == 0:
result = self.comm.sum_scalar(result.item())
else:
self.comm.sum(result)
return result
gsize = prod(a_xg.shape[-3:])
A_xg = np.ascontiguousarray(a_xg.reshape((-1, gsize)))
B_yg = np.ascontiguousarray(b_yg.reshape((-1, gsize)))
result_yx = np.zeros((len(B_yg), len(A_xg)), A_xg.dtype)
if a_xg is b_yg:
rk(self.dv, A_xg, 0.0, result_yx)
elif hermitian:
r2k(0.5 * self.dv, A_xg, B_yg, 0.0, result_yx)
else:
# gemm(self.dv, A_xg, B_yg, 0.0, result_yx, 'c')
mmm(self.dv, B_yg, 'N', A_xg, 'C', 0.0, result_yx)
if global_integral:
self.comm.sum(result_yx)
yshape = b_yg.shape[:-3]
result = result_yx.T.reshape(xshape + yshape)
if result.ndim == 0:
return result.item()
else:
return result
[docs]
def coarsen(self):
"""Return coarsened `GridDescriptor` object.
Reurned descriptor has 2x2x2 fewer grid points."""
if (self.N_c % 2).any():
raise ValueError('Grid %s not divisible by 2!' % self.N_c)
return self.new_descriptor(self.N_c // 2)
[docs]
def refine(self):
"""Return refined `GridDescriptor` object.
Returned descriptor has 2x2x2 more grid points."""
return self.new_descriptor(self.N_c * 2)
[docs]
def get_boxes(self, spos_c, rcut, cut=True):
"""Find boxes enclosing sphere."""
N_c = self.N_c
ncut = rcut * (self.icell_cv**2).sum(axis=1)**0.5 * self.N_c
npos_c = spos_c * N_c
beg_c = np.ceil(npos_c - ncut).astype(int)
end_c = np.ceil(npos_c + ncut).astype(int)
if cut:
for c in range(3):
if not self.pbc_c[c]:
if beg_c[c] < 0:
beg_c[c] = 0
if end_c[c] > N_c[c]:
end_c[c] = N_c[c]
else:
for c in range(3):
if (not self.pbc_c[c] and
(beg_c[c] < 0 or end_c[c] > N_c[c])):
msg = ('Box at %.3f %.3f %.3f crosses boundary. '
'Beg. of box %s, end of box %s, max box size %s' %
(tuple(spos_c) + (beg_c, end_c, self.N_c)))
raise GridBoundsError(msg)
range_c = ([], [], [])
for c in range(3):
b = beg_c[c]
e = b
while e < end_c[c]:
b0 = b % N_c[c]
e = min(end_c[c], b + N_c[c] - b0)
if b0 < self.beg_c[c]:
b1 = b + self.beg_c[c] - b0
else:
b1 = b
e0 = b0 - b + e
if e0 > self.end_c[c]:
e1 = e - (e0 - self.end_c[c])
else:
e1 = e
if e1 > b1:
range_c[c].append((b1, e1))
b = e
boxes = []
for b0, e0 in range_c[0]:
for b1, e1 in range_c[1]:
for b2, e2 in range_c[2]:
b = np.array((b0, b1, b2))
e = np.array((e0, e1, e2))
beg_c = np.array((b0 % N_c[0], b1 % N_c[1], b2 % N_c[2]))
end_c = beg_c + e - b
disp = (b - beg_c) / N_c
beg_c = np.maximum(beg_c, self.beg_c)
end_c = np.minimum(end_c, self.end_c)
if (beg_c[0] < end_c[0] and
beg_c[1] < end_c[1] and
beg_c[2] < end_c[2]):
boxes.append((beg_c, end_c, disp))
return boxes
[docs]
def get_nearest_grid_point(self, spos_c, force_to_this_domain=False):
"""Return index of nearest grid point.
The nearest grid point can be on a different CPU than the one the
nucleus belongs to (i.e. return can be negative, or larger than
gd.end_c), in which case something clever should be done.
The point can be forced to the grid descriptors domain to be
consistent with self.get_rank_from_position(spos_c).
"""
g_c = np.around(self.N_c * spos_c).astype(int)
if force_to_this_domain:
for c in range(3):
g_c[c] = max(g_c[c], self.beg_c[c])
g_c[c] = min(g_c[c], self.end_c[c] - 1)
return g_c - self.beg_c
[docs]
def plane_wave(self, k_c):
"""Evaluate plane wave on grid.
Returns::
_ _
ik.r
e ,
where the wave vector is given by k_c (in units of reciprocal
lattice vectors)."""
index_Gc = np.indices(self.n_c).T + self.beg_c
return np.exp(2j * pi * np.dot(index_Gc, k_c / self.N_c).T)
def symmetrize(self, a_g, op_scc, ft_sc=None):
# ft_sc: fractional translations
# XXXX documentation missing. This is some kind of array then?
if len(op_scc) == 1:
return
if ft_sc is not None and not ft_sc.any():
ft_sc = None
if ft_sc is not None:
compat = self.check_grid_compatibility(ft_sc)
if not compat:
newN_c = self.get_nearest_compatible_grid(ft_sc)
e = 'The specified number of grid points, ' \
+ str(self.N_c) + ', is not compatible with the'\
' symmetry of the atoms. Nearest compatible grid'\
' size is ' + str(newN_c) + '.'
raise ValueError(e)
A_g = self.collect(a_g)
if self.comm.rank == 0:
B_g = np.zeros_like(A_g)
for s, op_cc in enumerate(op_scc):
if ft_sc is None:
cgpaw.symmetrize(A_g, B_g, op_cc, 1 - self.pbc_c)
else:
t_c = (ft_sc[s] * self.N_c).round().astype(int)
cgpaw.symmetrize_ft(A_g, B_g, op_cc, t_c,
1 - self.pbc_c)
else:
B_g = None
self.distribute(B_g, a_g)
a_g /= len(op_scc)
def check_grid_compatibility(self, ft_sc):
# checks that grid is compatible with fractional translations
t_sc = ft_sc * self.N_c
intt_sc = t_sc.round().astype(int)
compat = np.allclose(t_sc, intt_sc, atol=1e-6)
return compat
def get_nearest_compatible_grid(self, ft_sc):
newN_c = np.zeros(self.N_c.shape)
for c, N in enumerate(self.N_c):
frac_s = [Fraction(str(ft_c[c])).limit_denominator(1000)
for ft_c in ft_sc]
lcm_denom = lcm.reduce([frac.denominator for frac in frac_s])
dNminus = N - (N % lcm_denom)
dNplus = dNminus + lcm_denom
if dNminus > 0 and np.abs(dNminus - N) < np.abs(dNplus - N):
newN_c[c] = dNminus
else:
newN_c[c] = dNplus
return newN_c.astype(int)
[docs]
def collect(self, a_xg, out=None, broadcast=False):
"""Collect distributed array to master-CPU or all CPU's."""
if self.comm.size == 1:
if out is None:
return a_xg
out[:] = a_xg
return out
xshape = a_xg.shape[:-3]
# Collect all arrays on the master:
if self.rank != 0:
# There can be several sends before the corresponding receives
# are posted, so use syncronous send here
self.comm.ssend(a_xg, 0, 301)
if broadcast:
A_xg = self.empty(xshape, a_xg.dtype, global_array=True)
self.comm.broadcast(A_xg, 0)
return A_xg
else:
return np.nan
# Put the subdomains from the slaves into the big array
# for the whole domain:
if out is None:
A_xg = self.empty(xshape, a_xg.dtype, global_array=True)
else:
A_xg = out
parsize_c = self.parsize_c
r = 0
for n0 in range(parsize_c[0]):
b0, e0 = self.n_cp[0][n0:n0 + 2] - self.beg_c[0]
for n1 in range(parsize_c[1]):
b1, e1 = self.n_cp[1][n1:n1 + 2] - self.beg_c[1]
for n2 in range(parsize_c[2]):
b2, e2 = self.n_cp[2][n2:n2 + 2] - self.beg_c[2]
if r != 0:
a_xg = np.empty(xshape +
((e0 - b0), (e1 - b1), (e2 - b2)),
a_xg.dtype.char)
self.comm.receive(a_xg, r, 301)
A_xg[..., b0:e0, b1:e1, b2:e2] = a_xg
r += 1
if broadcast:
self.comm.broadcast(A_xg, 0)
return A_xg
[docs]
def distribute(self, B_xg, out=None):
"""Distribute full array B_xg to subdomains, result in b_xg.
B_xg is not used by the slaves (i.e. it should be None on all slaves)
b_xg must be allocated on all nodes and will be overwritten.
"""
if self.comm.size == 1:
if out is None:
return B_xg
out[:] = B_xg
return out
if out is None:
out = self.empty(B_xg.shape[:-3], dtype=B_xg.dtype)
if self.rank != 0:
self.comm.receive(out, 0, 42)
else:
parsize_c = self.parsize_c
requests = []
r = 0
for n0 in range(parsize_c[0]):
b0, e0 = self.n_cp[0][n0:n0 + 2] - self.beg_c[0]
for n1 in range(parsize_c[1]):
b1, e1 = self.n_cp[1][n1:n1 + 2] - self.beg_c[1]
for n2 in range(parsize_c[2]):
b2, e2 = self.n_cp[2][n2:n2 + 2] - self.beg_c[2]
if r != 0:
a_xg = B_xg[..., b0:e0, b1:e1, b2:e2].copy()
request = self.comm.send(a_xg, r, 42, NONBLOCKING)
# Remember to store a reference to the
# send buffer (a_xg) so that is isn't
# deallocated:
requests.append((request, a_xg))
else:
out[:] = B_xg[..., b0:e0, b1:e1, b2:e2]
r += 1
for request, a_xg in requests:
self.comm.wait(request)
return out
[docs]
def zero_pad(self, a_xg, global_array=True):
"""Pad array with zeros as first element along non-periodic directions.
Array may either be local or in standard decomposition.
"""
# We could infer what global_array should be from a_xg.shape.
# But as it is now, there is a bit of redundancy to avoid
# confusing errors
gshape = a_xg.shape[-3:]
padding_c = 1 - self.pbc_c
if global_array:
assert (gshape == self.N_c - padding_c).all(), gshape
bshape = tuple(self.N_c)
else:
assert (gshape == self.n_c).all()
parpos_c = self.get_processor_position_from_rank()
# Only pad where domain is on edge:
padding_c *= (parpos_c == 0)
bshape = tuple(self.n_c + padding_c)
if self.pbc_c.all():
return a_xg
npbx, npby, npbz = padding_c
b_xg = np.zeros(a_xg.shape[:-3] + tuple(bshape), dtype=a_xg.dtype)
b_xg[..., npbx:, npby:, npbz:] = a_xg
return b_xg
[docs]
def dipole_moment(self,
rho_R: Array3D,
center_v: Vector = None) -> Array1D:
"""Calculate dipole moment of density.
Integration region will be centered on center_v. Default center
is center of unit cell.
"""
index_cr = [np.arange(self.beg_c[c], self.end_c[c], dtype=float)
for c in range(3)]
if center_v is not None:
corner_c = (np.linalg.solve(self.h_cv.T,
center_v) % self.N_c) - self.N_c / 2
for corner, index_r, N in zip(corner_c, index_cr, self.N_c):
index_r -= corner
index_r %= N
index_r += corner
rho_ijk = rho_R
rho_ij = rho_ijk.sum(axis=2)
rho_ik = rho_ijk.sum(axis=1)
rho_cr = [rho_ij.sum(axis=1), rho_ij.sum(axis=0), rho_ik.sum(axis=0)]
d_c = [np.dot(index_cr[c], rho_cr[c]) for c in range(3)]
d_v = -np.dot(d_c, self.h_cv) * self.dv
self.comm.sum(d_v)
return d_v
[docs]
def calculate_dipole_moment(self, rho_g, center=False, origin_c=None):
"""Calculate dipole moment of density."""
r_cz = [np.arange(self.beg_c[c], self.end_c[c]) for c in range(3)]
if center:
assert origin_c is None
r_cz = [r_cz[c] - 0.5 * self.N_c[c] for c in range(3)]
elif origin_c is not None:
r_cz = [r_cz[c] - origin_c[c] for c in range(3)]
rho_01 = rho_g.sum(axis=2)
rho_02 = rho_g.sum(axis=1)
rho_cz = [rho_01.sum(axis=1), rho_01.sum(axis=0), rho_02.sum(axis=0)]
rhog_c = [np.dot(r_cz[c], rho_cz[c]) for c in range(3)]
d_c = -np.dot(rhog_c, self.h_cv) * self.dv
self.comm.sum(d_c)
return d_c
[docs]
def wannier_matrix(self, psit_nG, psit_nG1, G_c, nbands=None):
"""Wannier localization integrals
The soft part of Z is given by (Eq. 27 ref1)::
~ ~ -i G.r ~
Z = <psi | e |psi >
nm n m
psit_nG and psit_nG1 are the set of wave functions for the two
different spin/kpoints in question.
ref1: Thygesen et al, Phys. Rev. B 72, 125119 (2005)
"""
if nbands is None:
nbands = len(psit_nG)
if nbands == 0:
return np.zeros((0, 0), complex)
e_G = np.exp(-2j * pi * np.dot(np.indices(self.n_c).T +
self.beg_c, G_c / self.N_c).T)
a_nG = (e_G * psit_nG[:nbands].conj()).reshape((nbands, -1))
return np.inner(a_nG,
psit_nG1[:nbands].reshape((nbands, -1))) * self.dv
[docs]
def find_center(self, a_R):
"""Calculate center of positive function."""
assert self.orthogonal
r_vR = self.get_grid_point_coordinates()
a_R = a_R.astype(complex)
center = []
for L, r_R in zip(self.cell_cv.diagonal(), r_vR):
z = self.integrate(a_R, np.exp(2j * pi / L * r_R))
center.append(np.angle(z) / (2 * pi) * L % L)
return np.array(center)
[docs]
def bytecount(self, dtype=float):
"""Get the number of bytes used by a grid of specified dtype."""
return int(np.prod(self.n_c)) * np.array(1, dtype).itemsize
[docs]
def get_grid_point_coordinates(self, dtype=float, global_array=False):
"""Construct cartesian coordinates of grid points in the domain."""
r_vG = np.dot(np.indices(self.n_c, dtype).T + self.beg_c,
self.h_cv).T.copy()
if global_array:
return self.collect(r_vG, broadcast=True) # XXX waste!
else:
return r_vG
[docs]
def get_grid_point_distance_vectors(self, r_v, mic=True, dtype=float):
"""Return distances to a given vector in the domain.
mic: if true adopts the mininimum image convention
procedure by W. Smith in 'The Minimum image convention in
Non-Cubic MD cells' March 29, 1989
"""
s_Gc = (np.indices(self.n_c, dtype).T + self.beg_c) / self.N_c
r_c = np.linalg.solve(self.cell_cv.T, r_v)
# do the correction twice works better because of rounding errors
# e.g.: -1.56250000e-25 % 1.0 = 1.0,
# but (-1.56250000e-25 % 1.0) % 1.0 = 0.0
r_c = np.where(self.pbc_c, r_c % 1.0, r_c)
s_Gc -= np.where(self.pbc_c, r_c % 1.0, r_c)
if mic:
s_Gc -= self.pbc_c * (2 * s_Gc).astype(int)
# sanity check
assert (s_Gc * self.pbc_c >= -0.5).all()
assert (s_Gc * self.pbc_c <= 0.5).all()
return np.dot(s_Gc, self.cell_cv).T.copy()
[docs]
def interpolate_grid_points(self, spos_nc, vt_g):
"""Return interpolated values.
Calculate interpolated values from array vt_g based on the
scaled coordinates on spos_c.
This doesn't work in parallel, since it would require
communication between neighbouring grids."""
assert self.comm.size == 1
vt_g = self.zero_pad(vt_g)
return map_coordinates(vt_g,
(spos_nc * self.N_c).T,
order=3,
mode='wrap')
[docs]
def is_my_grid_point(self, R_c: Sequence[int]) -> bool:
"""Check if grid point belongs to this domain."""
return ((self.beg_c <= R_c) & (R_c < self.end_c)).all()