# --- Standard library
import dataclasses
import itertools
import os.path
import typing
# --- External
import numpy as np
import pyscf
import pyscf.lo
# --- Internal
from vayesta.core.util import (
OptionsBase,
cache,
deprecated,
dot,
einsum,
energy_string,
fix_orbital_sign,
hstack,
log_time,
time_string,
timer,
AbstractMethodError,
)
from vayesta.core import spinalg
from vayesta.core.types import Cluster, Orbitals
from vayesta.core.symmetry import SymmetryIdentity
from vayesta.core.symmetry import SymmetryTranslation
import vayesta.core.ao2mo
import vayesta.core.ao2mo.helper
from vayesta.core.types import WaveFunction
# Bath
from vayesta.core.bath import BNO_Threshold
from vayesta.core.bath import DMET_Bath
from vayesta.core.bath import EwDMET_Bath
from vayesta.core.bath import MP2_Bath
from vayesta.core.bath import Full_Bath
from vayesta.core.bath import R2_Bath
from vayesta.core.bath import RPA_Bath
# Bosonic Bath
from vayesta.core.bosonic_bath import RPA_Boson_Target_Space, RPA_QBA_Bath, Boson_Threshold
from vayesta.core.types.bosonic_orbitals import QuasiBosonOrbitals
# Other
from vayesta.misc.cubefile import CubeFile
from vayesta.mpi import mpi
from vayesta.solver import get_solver_class, check_solver_config, ClusterHamiltonian
from vayesta.core.screening.screening_crpa import get_frag_W
# Get MPI rank of fragment
get_fragment_mpi_rank = lambda *args: args[0].mpi_rank
[docs]@dataclasses.dataclass
class Options(OptionsBase):
# Inherited from Embedding
# ------------------------
# --- Bath options
bath_options: dict = None
bosonic_bath_options: dict = None
# --- Solver options
solver_options: dict = None
# --- Other
store_eris: bool = None # If True, ERIs will be cached in Fragment.hamil
dm_with_frozen: bool = None # TODO: is still used?
screening: typing.Optional[str] = None
match_cluster_fock: bool = None
# Fragment specific
# -----------------
# Auxiliary fragments are treated before non-auxiliary fragments, but do not contribute to expectation values
auxiliary: bool = False
coupled_fragments: list = dataclasses.field(default_factory=list)
sym_factor: float = 1.0
[docs]class Fragment:
Options = Options
[docs] @dataclasses.dataclass
class Flags:
# If multiple cluster exist for a given atom, the envelop fragment is
# the one with the largest (super) cluster space. This is used, for example,
# for the finite-bath and ICMP2 corrections
is_envelop: bool = True
is_secfrag: bool = False
# Secondary fragment parameter
bath_parent_fragment_id: typing.Optional[int] = None
[docs] @dataclasses.dataclass
class Results:
fid: int = None # Fragment ID
converged: bool = (
None # True, if solver reached convergence criterion or no convergence required (eg. MP2 solver)
)
# --- Energies
e_corr: float = None # Fragment correlation energy contribution
e_corr_rpa: float = None # Fragment correlation energy contribution at the level of RPA.
# --- Wave-function
wf: WaveFunction = None # WaveFunction object (MP2, CCSD,...)
pwf: WaveFunction = None # Fragment-projected wave function
moms: tuple = None
def __init__(
self,
base,
fid,
name,
c_frag,
c_env,
solver=None,
atoms=None,
aos=None,
active=True,
sym_parent=None,
sym_op=None,
mpi_rank=0,
flags=None,
log=None,
**kwargs,
):
"""Abstract base class for quantum embedding fragments.
The fragment may keep track of associated atoms or atomic orbitals, using
the `atoms` and `aos` attributes, respectively.
Parameters
----------
base : Embedding
Quantum embedding method the fragment is part of.
fid : int
Fragment ID.
name : str
Name of fragment.
c_frag : (nAO, nFrag) array
Fragment orbital coefficients.
c_env : (nAO, nEnv) array
Environment (non-fragment) orbital coefficients.
fragment_type : {'IAO', 'Lowdin-AO', 'AO'}
Fragment orbital type.
atoms : list or int, optional
Associated atoms. Default: None
aos : list or int, optional
Associated atomic orbitals. Default: None
sym_factor : float, optional
Symmetry factor (number of symmetry equivalent fragments). Default: 1.0.
sym_parent : Fragment, optional
Symmetry related parent fragment. Default: None.
sym_op : Callable, optional
Symmetry operation on AO basis function, representing the symmetry to the `sym_parent` object. Default: None.
log : logging.Logger
Logger object. If None, the logger of the `base` object is used. Default: None.
Attributes
----------
mol
mf
size
nelectron
id_name
log : logging.Logger
Logger object.
base : Embedding
Quantum embedding method, the fragment is part of.
id : int
Unique fragment ID.
name : str
Name of fragment.
c_frag : (nAO, nFrag) array
Fragment orbital coefficients.
c_env : (nAO, nEnv) array
Environment (non-fragment) orbital coefficients.
fragment_type : {'IAO', 'Lowdin-AO', 'AO'}
Fragment orbital type.
sym_factor : float
Symmetry factor (number of symmetry equivalent fragments).
atoms : list
Atoms in fragment.
aos : list
Atomic orbitals in fragment
coupled_fragments : list
List of fragments, the current fragment is coupled to.
"""
self.log = log or base.log
self.id = fid
self.name = name
self.base = base
# Options
self.opts = self.Options() # Default options
self.opts.update(**self.base.opts.asdict(deepcopy=True)) # Update with embedding class options
self.opts.replace(**kwargs) # Replace with keyword arguments
# Flags
self.flags = self.Flags(**(flags or {}))
solver = solver or self.base.solver
self.check_solver(solver)
self.solver = solver
self.c_frag = c_frag
self.c_env = c_env
self.sym_factor = self.opts.sym_factor
self.sym_parent = sym_parent
self.sym_op = sym_op
# For some embeddings, it may be necessary to keep track of any associated atoms or basis functions (AOs)
self.atoms = atoms
# TODO: Is aos still used?
self.aos = aos
self.active = active
# MPI
self.mpi_rank = mpi_rank
# This set of orbitals is used in the projection to evaluate expectation value contributions
# of the fragment. By default it is equal to `self.c_frag`.
self.c_proj = self.c_frag
# Initialize self.bath, self._cluster, self._results, self.hamil
self.reset()
self.log.debugv("Creating %r", self)
# self.log.info(break_into_lines(str(self.opts), newline='\n '))
def __repr__(self):
if mpi:
return "%s(id= %d, name= %s, mpi_rank= %d)" % (self.__class__.__name__, self.id, self.name, self.mpi_rank)
return "%s(id= %d, name= %s)" % (self.__class__.__name__, self.id, self.name)
def __str__(self):
return "%s %d: %s" % (self.__class__.__name__, self.id, self.name)
[docs] def log_info(self):
# Some output
fmt = " > %-24s "
self.log.info(fmt + "%d", "Fragment orbitals:", self.n_frag)
self.log.info(fmt + "%f", "Symmetry factor:", self.sym_factor)
self.log.info(fmt + "%.10f", "Number of electrons:", self.nelectron)
if self.atoms is not None:
self.log.info(fmt + "%r", "Associated atoms:", self.atoms)
if self.aos is not None:
self.log.info(fmt + "%r", "Associated AOs:", self.aos)
@property
def mol(self):
return self.base.mol
@property
def mf(self):
return self.base.mf
@property
def n_frag(self):
"""Number of fragment orbitals."""
return self.c_frag.shape[-1]
@property
def nelectron(self):
"""Number of mean-field electrons."""
sc = np.dot(self.base.get_ovlp(), self.c_frag)
ne = einsum("ai,ab,bi->", sc, self.mf.make_rdm1(), sc)
return ne
[docs] def trimmed_name(self, length=10, add_dots=True):
"""Fragment name trimmed to a given maximum length."""
if len(self.name) <= length:
return self.name
if add_dots:
return self.name[: (length - 3)] + "..."
return self.name[:length]
@property
def id_name(self):
"""Use this whenever a unique name is needed (for example to open a separate file for each fragment)."""
return "%d-%s" % (self.id, self.trimmed_name())
[docs] def change_options(self, **kwargs):
self.opts.replace(**kwargs)
@property
def hamil(self):
# Cache the hamiltonian object; note that caching or not of eris is handled inside the hamiltonian object.
if self._hamil is None:
self.hamil = self.get_frag_hamil()
return self._hamil
@hamil.setter
def hamil(self, value):
self._hamil = value
# --- Overlap matrices
# --------------------
def _csc_dot(self, c1, c2, ovlp=True, transpose_left=True, transpose_right=False):
if transpose_left:
c1 = c1.T
if transpose_right:
c2 = c2.T
if ovlp is True:
ovlp = self.base.get_ovlp()
if ovlp is None:
return dot(c1, c2)
return dot(c1, ovlp, c2)
[docs] @cache
def get_overlap(self, key):
"""Get overlap between cluster orbitals, fragment orbitals, or MOs.
The return value is cached but not copied; do not modify the array in place without
creating a copy!
Examples:
>>> s = self.get_overlap('cluster|mo')
>>> s = self.get_overlap('cluster|frag')
>>> s = self.get_overlap('mo[occ]|cluster[occ]')
>>> s = self.get_overlap('mo[vir]|cluster[vir]')
"""
if key.count("|") > 1:
left, center, right = key.rsplit("|", maxsplit=2)
overlap_left = self.get_overlap("|".join((left, center)))
overlap_right = self.get_overlap("|".join((center, right)))
return self._csc_dot(overlap_left, overlap_right, ovlp=None, transpose_left=False)
# Standardize key to reduce cache misses:
key_mod = key.lower().replace(" ", "")
if key_mod != key:
return self.get_overlap(key_mod)
def _get_coeff(key):
if "frag" in key:
return self.c_frag
if "proj" in key:
return self.c_proj
if "occ" in key:
part = "_occ"
elif "vir" in key:
part = "_vir"
else:
part = ""
if "mo" in key:
return getattr(self.base, "mo_coeff%s" % part)
if "cluster" in key:
return getattr(self.cluster, "c_active%s" % part)
raise ValueError("Invalid key: '%s'")
left, right = key.split("|")
c_left = _get_coeff(left)
c_right = _get_coeff(right)
return self._csc_dot(c_left, c_right)
[docs] def get_coeff_env(self):
if self.c_env is not None:
return self.c_env
return self.sym_op(self.sym_parent.get_coeff_env())
@property
def results(self):
return self.get_symmetry_parent()._results
@results.setter
def results(self, value):
if self.sym_parent is not None:
raise RuntimeError("Cannot set attribute results in symmetry derived fragment.")
self._results = value
@property
def cluster(self):
if self.sym_parent is not None:
return self.sym_parent.cluster.basis_transform(self.sym_op)
return self._cluster
@cluster.setter
def cluster(self, value):
if self.sym_parent is not None:
raise RuntimeError("Cannot set attribute cluster in symmetry derived fragment.")
self._cluster = value
[docs] def reset(self, reset_bath=True, reset_cluster=True, reset_eris=True, reset_inactive=True):
self.log.debugv(
"Resetting %s (reset_bath= %r, reset_cluster= %r, reset_eris= %r, reset_inactive= %r)",
self,
reset_bath,
reset_cluster,
reset_eris,
reset_inactive,
)
if not reset_inactive and not self.active:
return
if reset_bath:
self._dmet_bath = None
self._bath_factory_occ = None
self._bath_factory_vir = None
if reset_cluster:
self._cluster = None
self.get_overlap.cache_clear()
if reset_eris:
self.hamil = None
self._seris_ov = None
self._results = None
[docs] def get_fragments_with_overlap(self, tol=1e-8, **kwargs):
"""Get list of fragments which overlap both in occupied and virtual space."""
c_occ = self.get_overlap("mo[occ]|cluster[occ]")
c_vir = self.get_overlap("mo[vir]|cluster[vir]")
def svd(cx, cy):
rxy = np.dot(cx.T, cy)
return np.linalg.svd(rxy, compute_uv=False)
frags = []
for fx in self.base.get_fragments(**kwargs):
if fx.id == self.id:
continue
cx_occ = fx.get_overlap("mo[occ]|cluster[occ]")
s_occ = svd(c_occ, cx_occ)
if s_occ.max() < tol:
continue
cx_vir = fx.get_overlap("mo[vir]|cluster[vir]")
s_vir = svd(c_vir, cx_vir)
if s_vir.max() < tol:
continue
frags.append(fx)
return frags
[docs] def couple_to_fragment(self, frag):
if frag is self:
raise RuntimeError("Cannot couple fragment with itself.")
self.log.debugv("Coupling %s with %s", self, frag)
self.opts.coupled_fragments.append(frag)
[docs] def couple_to_fragments(self, frags):
for frag in frags:
self.couple_to_fragment(frag)
[docs] def get_fragment_mf_energy(self):
"""Calculate the part of the mean-field energy associated with the fragment.
Does not include nuclear-nuclear repulsion!
"""
px = self.get_fragment_projector(self.base.mo_coeff)
hveff = dot(px, self.base.mo_coeff.T, 2 * self.base.get_hcore() + self.base.get_veff(), self.base.mo_coeff)
occ = self.base.mo_occ > 0
e_mf = np.sum(np.diag(hveff)[occ])
return e_mf
@property
def contributes(self):
"""True if fragment contributes to expectation values, else False."""
return self.active and not self.opts.auxiliary
[docs] def get_fragment_projector(self, coeff, c_proj=None, inverse=False):
"""Projector for one index of amplitudes local energy expression.
Cost: N^2 if O(1) coeffs , N^3 if O(N) coeffs
Parameters
----------
coeff : ndarray, shape(n(AO), N)
Occupied or virtual orbital coefficients.
inverse : bool, optional
Return 1-p instead. Default: False.
Returns
-------
p : (n, n) array
Projection matrix.
"""
if c_proj is None:
c_proj = self.c_proj
r = dot(coeff.T, self.base.get_ovlp(), c_proj)
p = np.dot(r, r.T)
if inverse:
p = np.eye(p.shape[-1]) - p
return p
[docs] def get_mo_occupation(self, *mo_coeff, dm1=None):
"""Get mean-field occupation numbers (diagonal of 1-RDM) of orbitals.
Parameters
----------
mo_coeff : ndarray, shape(N, M)
Orbital coefficients.
Returns
-------
occup : ndarray, shape(M)
Occupation numbers of orbitals.
"""
mo_coeff = hstack(*mo_coeff)
if dm1 is None:
dm1 = self.mf.make_rdm1()
sc = np.dot(self.base.get_ovlp(), mo_coeff)
occup = einsum("ai,ab,bi->i", sc, dm1, sc)
return occup
# def check_mo_occupation(self, expected, *mo_coeff, tol=None):
# if tol is None: tol = 2*self.opts.dmet_threshold
# occup = self.get_mo_occupation(*mo_coeff)
# if not np.allclose(occup, expected, atol=tol):
# raise RuntimeError("Incorrect occupation of orbitals (expected %f):\n%r" % (expected, occup))
# return occup
[docs] def canonicalize_mo(self, *mo_coeff, fock=None, eigvals=False, sign_convention=True):
"""Diagonalize Fock matrix within subspace.
TODO: move to Embedding class
Parameters
----------
*mo_coeff : ndarrays
Orbital coefficients.
eigenvalues : ndarray
Return MO energies of canonicalized orbitals.
Returns
-------
mo_canon : ndarray
Canonicalized orbital coefficients.
rot : ndarray
Rotation matrix: np.dot(mo_coeff, rot) = mo_canon.
"""
if fock is None:
fock = self.base.get_fock()
mo_coeff = hstack(*mo_coeff)
fock = dot(mo_coeff.T, fock, mo_coeff)
mo_energy, rot = np.linalg.eigh(fock)
self.log.debugv("Canonicalized MO energies:\n%r", mo_energy)
mo_can = np.dot(mo_coeff, rot)
if sign_convention:
mo_can, signs = fix_orbital_sign(mo_can)
rot = rot * signs[np.newaxis]
assert np.allclose(np.dot(mo_coeff, rot), mo_can)
assert np.allclose(np.dot(mo_can, rot.T), mo_coeff)
if eigvals:
return mo_can, rot, mo_energy
return mo_can, rot
[docs] def diagonalize_cluster_dm(self, *mo_coeff, dm1=None, norm=2, tol=1e-4):
"""Diagonalize cluster (fragment+bath) DM to get fully occupied and virtual orbitals.
Parameters
----------
*mo_coeff: array or list of arrays
Orbital coefficients. If multiple are given, they will be stacked along their second dimension.
dm1: array, optional
Mean-field density matrix, used to separate occupied and virtual cluster orbitals.
If None, `self.mf.make_rdm1()` is used. Default: None.
tol: float, optional
If set, check that all eigenvalues of the cluster DM are close
to 0 or 2, with the tolerance given by tol. Default= 1e-4.
Returns
-------
c_cluster_occ: (n(AO), n(occ cluster)) array
Occupied cluster orbital coefficients.
c_cluster_vir: (n(AO), n(vir cluster)) array
Virtual cluster orbital coefficients.
"""
if dm1 is None:
dm1 = self.mf.make_rdm1()
c_cluster = hstack(*mo_coeff)
sc = np.dot(self.base.get_ovlp(), c_cluster)
dm = dot(sc.T, dm1, sc)
e, r = np.linalg.eigh(dm)
if tol and not np.allclose(np.fmin(abs(e), abs(e - norm)), 0, atol=2 * tol, rtol=0):
self.log.warn("Eigenvalues of cluster-DM not all close to 0 or %d:\n%s" % (norm, e))
e, r = e[::-1], r[:, ::-1]
c_cluster = np.dot(c_cluster, r)
c_cluster = fix_orbital_sign(c_cluster)[0]
nocc = np.count_nonzero(e >= (norm / 2))
c_cluster_occ, c_cluster_vir = np.hsplit(c_cluster, [nocc])
return c_cluster_occ, c_cluster_vir
[docs] def project_ref_orbitals(self, c_ref, c):
"""Project reference orbitals into available space in new geometry.
The projected orbitals will be ordered according to their eigenvalues within the space.
Parameters
----------
c : ndarray
Orbital coefficients.
c_ref : ndarray
Orbital coefficients of reference orbitals.
"""
nref = c_ref.shape[-1]
assert nref > 0
assert c.shape[-1] > 0
self.log.debug("Projecting %d reference orbitals into space of %d orbitals", nref, c.shape[-1])
s = self.base.get_ovlp()
# Diagonalize reference orbitals among themselves (due to change in overlap matrix)
c_ref_orth = pyscf.lo.vec_lowdin(c_ref, s)
assert c_ref_orth.shape == c_ref.shape
# Diagonalize projector in space
csc = np.linalg.multi_dot((c_ref_orth.T, s, c))
p = np.dot(csc.T, csc)
e, r = np.linalg.eigh(p)
e, r = e[::-1], r[:, ::-1]
c = np.dot(c, r)
return c, e
# --- Symmetry
# ============
[docs] def copy(self, fid=None, name=None, **kwargs):
"""Create copy of fragment, without adding it to the fragments list."""
if fid is None:
fid = self.base.register.get_next_id()
name = name or ("%s(copy)" % self.name)
kwargs_copy = self.opts.asdict().copy()
kwargs_copy.update(kwargs)
attrs = ["c_frag", "c_env", "solver", "atoms", "aos", "active", "sym_parent", "sym_op", "mpi_rank", "log"]
for attr in attrs:
if attr in kwargs_copy:
continue
kwargs_copy[attr] = getattr(self, attr)
frag = type(self)(self.base, fid, name, **kwargs_copy)
return frag
[docs] def add_tsymmetric_fragments(self, tvecs, symtol=1e-6):
"""
Parameters
----------
tvecs: array(3) of integers
Each element represent the number of translation vector corresponding to the a0, a1, and a2 lattice vectors of the cell.
symtol: float, optional
Tolerance for the error of the mean-field density matrix between symmetry related fragments.
If the largest absolute difference in the density-matrix is above this value,
and exception will be raised. Default: 1e-6.
Returns
-------
fragments: list
List of T-symmetry related fragments. These will be automatically added to base.fragments and
have the attributes `sym_parent` and `sym_op` set.
"""
ovlp = self.base.get_ovlp()
dm1 = self.mf.make_rdm1()
fragments = []
for i, (dx, dy, dz) in enumerate(itertools.product(range(tvecs[0]), range(tvecs[1]), range(tvecs[2]))):
if i == 0:
continue
tvec = (dx / tvecs[0], dy / tvecs[1], dz / tvecs[2])
sym_op = SymmetryTranslation(self.base.symmetry, tvec)
if sym_op is None:
self.log.error(
"No T-symmetric fragment found for translation (%d,%d,%d) of fragment %s", dx, dy, dz, self.name
)
continue
# Name for translationally related fragments
name = "%s_T(%d,%d,%d)" % (self.name, dx, dy, dz)
# Translated coefficients
c_frag_t = sym_op(self.c_frag)
c_env_t = None # Avoid expensive symmetry operation on environment orbitals
# Check that translated fragment does not overlap with current fragment:
fragovlp = self._csc_dot(self.c_frag, c_frag_t, ovlp=ovlp)
if self.base.spinsym == "restricted":
fragovlp = abs(fragovlp).max()
elif self.base.spinsym == "unrestricted":
fragovlp = max(abs(fragovlp[0]).max(), abs(fragovlp[1]).max())
if fragovlp > 1e-8:
self.log.critical(
"Translation (%d,%d,%d) of fragment %s not orthogonal to original fragment (overlap= %.3e)!",
dx,
dy,
dz,
self.name,
fragovlp,
)
raise RuntimeError("Overlapping fragment spaces.")
# Add fragment
frag_id = self.base.register.get_next_id()
frag = self.base.Fragment(
self.base,
frag_id,
name,
c_frag_t,
c_env_t,
sym_parent=self,
sym_op=sym_op,
mpi_rank=self.mpi_rank,
**self.opts.asdict(),
)
self.base.fragments.append(frag)
# Check symmetry
# (only for the primitive translations (1,0,0), (0,1,0), and (0,0,1) to reduce number of sym_op(c_env) calls)
if abs(dx) + abs(dy) + abs(dz) == 1:
charge_err, spin_err = self.get_symmetry_error(frag, dm1=dm1)
if max(charge_err, spin_err) > symtol:
self.log.critical(
"Mean-field DM1 not symmetric for translation (%d,%d,%d) of %s (errors: charge= %.3e, spin= %.3e)!",
dx,
dy,
dz,
self.name,
charge_err,
spin_err,
)
raise RuntimeError("MF not symmetric under translation (%d,%d,%d)" % (dx, dy, dz))
else:
self.log.debugv(
"Mean-field DM symmetry error for translation (%d,%d,%d) of %s: charge= %.3e, spin= %.3e",
dx,
dy,
dz,
self.name,
charge_err,
spin_err,
)
fragments.append(frag)
return fragments
[docs] def get_symmetry_parent(self):
if self.sym_parent is None:
return self
return self.sym_parent.get_symmetry_parent()
[docs] def get_symmetry_operation(self):
if self.sym_parent is None:
return SymmetryIdentity(self.base.symmetry)
return self.sym_op
[docs] def get_symmetry_generations(self, maxgen=None, **filters):
if maxgen == 0:
return []
generations = []
fragments = self.base.get_fragments(**filters)
# Direct children:
lastgen = self.base.get_fragments(fragments, sym_parent=self)
generations.append(lastgen)
# Children of children, etc:
for gen in range(1, maxgen or 1000):
newgen = []
for fx in lastgen:
newgen += self.base.get_fragments(fragments, sym_parent=fx)
if not newgen:
break
generations.append(newgen)
lastgen = newgen
return generations
[docs] def get_symmetry_children(self, maxgen=None, **filters):
gens = self.get_symmetry_generations(maxgen, **filters)
# Flatten list of lists:
children = list(itertools.chain.from_iterable(gens))
return children
[docs] def get_symmetry_tree(self, maxgen=None, **filters):
"""Returns a recursive tree:
[(x, [children of x]), (y, [children of y]), ...]
"""
if maxgen is None:
maxgen = 1000
if maxgen == 0:
return []
# Get direct children:
children = self.get_symmetry_children(maxgen=1, **filters)
# Build tree recursively:
tree = [(x, x.get_symmetry_tree(maxgen=maxgen - 1, **filters)) for x in children]
return tree
[docs] def loop_symmetry_children(self, arrays=None, axes=None, symtree=None, maxgen=None, include_self=False):
"""Loop over all symmetry related fragments, including children of children, etc.
Parameters
----------
arrays : ndarray or list[ndarray], optional
If arrays are passed, the symmetry operation of each symmetry related fragment will be
applied to this array along the axis given in `axes`.
axes : list[int], optional
List of axes, along which the symmetry operation is applied for each element of `arrays`.
If None, the first axis will be used.
"""
def get_yield(fragment, arrays):
if arrays is None or len(arrays) == 0:
return fragment
if len(arrays) == 1:
return fragment, arrays[0]
return fragment, arrays
if include_self:
yield get_yield(self, arrays)
if maxgen == 0:
return
elif maxgen is None:
maxgen = 1000
if arrays is None:
arrays = []
if axes is None:
axes = len(arrays) * [0]
if symtree is None:
symtree = self.get_symmetry_tree()
for child, grandchildren in symtree:
intermediates = [child.sym_op(arr, axis=axis) for (arr, axis) in zip(arrays, axes)]
yield get_yield(child, intermediates)
if grandchildren and maxgen > 1:
yield from child.loop_symmetry_children(
intermediates, axes=axes, symtree=grandchildren, maxgen=(maxgen - 1)
)
@property
def n_symmetry_children(self):
"""Includes children of children, etc."""
return len(self.get_symmetry_children())
@property
def symmetry_factor(self):
"""Includes children of children, etc."""
return self.n_symmetry_children + 1
[docs] def get_symmetry_error(self, frag, dm1=None):
"""Get translational symmetry error between two fragments."""
if dm1 is None:
dm1 = self.mf.make_rdm1()
ovlp = self.base.get_ovlp()
# This fragment (x)
cx = np.hstack((self.c_frag, self.get_coeff_env()))
dmx = dot(cx.T, ovlp, dm1, ovlp, cx)
# Other fragment (y)
cy = np.hstack((frag.c_frag, frag.get_coeff_env()))
dmy = dot(cy.T, ovlp, dm1, ovlp, cy)
err = abs(dmx - dmy).max()
return err, 0.0
# def check_mf_tsymmetry(self):
# """Check translational symmetry of the mean-field between fragment and its children."""
# ovlp = self.base.get_ovlp()
# sds = dot(ovlp, self.mf.make_rdm1(), ovlp)
# c0 = np.hstack((self.c_frag, self.c_env))
# dm0 = dot(c0.T, sds, c0)
# for frag in self.get_symmetry_children():
# c1 = np.hstack((frag.c_frag, frag.c_env))
# dm1 = dot(c1.T, sds, c1)
# err = abs(dm1 - dm0).max()
# if err > mf_tol:
# self.log.error("Mean-field not T-symmetric between %s and %s (error= %.3e)!",
# self.name, frag.name, err)
# continue
# else:
# self.log.debugv("Mean-field T-symmetry error between %s and %s = %.3e", self.name, frag.name, err)
# Bath and cluster
# ----------------
def _get_bath_option(self, key, occtype):
"""Get bath-option, checking if a occupied-virtual specific option has been set."""
opts = self.opts.bath_options
opt = opts.get("%s_%s" % (key, occtype[:3]), None)
if opt is not None:
return opt
return opts[key]
[docs] def make_bath(self):
# --- DMET bath
dmet = DMET_Bath(self, dmet_threshold=self.opts.bath_options["dmet_threshold"])
dmet.kernel()
self._dmet_bath = dmet
# --- Additional bath
def get_bath(occtype):
otype = occtype[:3]
assert otype in ("occ", "vir")
btype = self._get_bath_option("bathtype", occtype)
# DMET bath only
if btype == "dmet":
return None
# Full bath (for debugging)
if btype == "full":
return Full_Bath(self, dmet_bath=dmet, occtype=occtype)
# Energy-weighted DMET bath
if btype == "ewdmet":
threshold = self._get_bath_option("threshold", occtype)
max_order = self._get_bath_option("max_order", occtype)
return EwDMET_Bath(self, dmet_bath=dmet, occtype=occtype, threshold=threshold, max_order=max_order)
# Spatially close orbitals
if btype == "r2":
return R2_Bath(self, dmet, occtype=occtype)
# MP2 bath natural orbitals
if btype == "mp2":
project_dmet_order = self._get_bath_option("project_dmet_order", occtype)
project_dmet_mode = self._get_bath_option("project_dmet_mode", occtype)
addbuffer = self._get_bath_option("addbuffer", occtype) and occtype == "virtual"
if addbuffer:
other = "occ" if (otype == "vir") else "vir"
c_buffer = getattr(dmet, "c_env_%s" % other)
else:
c_buffer = None
return MP2_Bath(
self,
dmet_bath=dmet,
occtype=occtype,
c_buffer=c_buffer,
project_dmet_order=project_dmet_order,
project_dmet_mode=project_dmet_mode,
)
if btype == "rpa":
project_dmet_order = self._get_bath_option("project_dmet_order", occtype)
project_dmet_mode = self._get_bath_option("project_dmet_mode", occtype)
return RPA_Bath(
self,
dmet_bath=dmet,
occtype=occtype,
c_buffer=None,
project_dmet_order=project_dmet_order,
project_dmet_mode=project_dmet_mode,
)
raise NotImplementedError("bathtype= %s" % btype)
self._bath_factory_occ = get_bath(occtype="occupied")
self._bath_factory_vir = get_bath(occtype="virtual")
[docs] def make_cluster(self):
def get_orbitals(occtype):
factory = getattr(self, "_bath_factory_%s" % occtype[:3])
btype = self._get_bath_option("bathtype", occtype)
if btype == "dmet":
c_bath = None
c_frozen = getattr(self._dmet_bath, "c_env_%s" % occtype[:3])
elif btype == "full":
c_bath, c_frozen = factory.get_bath()
elif btype == "r2":
rcut = self._get_bath_option("rcut", occtype)
unit = self._get_bath_option("unit", occtype)
c_bath, c_frozen = factory.get_bath(rcut=rcut)
elif btype == "ewdmet":
order = self._get_bath_option("order", occtype)
c_bath, c_frozen = factory.get_bath(order)
elif btype in ["mp2", "rpa", "rpacorr"]:
threshold = self._get_bath_option("threshold", occtype)
truncation = self._get_bath_option("truncation", occtype)
bno_threshold = BNO_Threshold(truncation, threshold)
c_bath, c_frozen = factory.get_bath(bno_threshold)
else:
raise ValueError
return c_bath, c_frozen
c_bath_occ, c_frozen_occ = get_orbitals("occupied")
c_bath_vir, c_frozen_vir = get_orbitals("virtual")
c_active_occ = spinalg.hstack_matrices(self._dmet_bath.c_cluster_occ, c_bath_occ)
c_active_vir = spinalg.hstack_matrices(self._dmet_bath.c_cluster_vir, c_bath_vir)
# Canonicalize orbitals
if self._get_bath_option("canonicalize", "occupied"):
c_active_occ = self.canonicalize_mo(c_active_occ)[0]
if self._get_bath_option("canonicalize", "virtual"):
c_active_vir = self.canonicalize_mo(c_active_vir)[0]
cluster = Cluster.from_coeffs(c_active_occ, c_active_vir, c_frozen_occ, c_frozen_vir)
# Check occupations
def check_occup(mo_coeff, expected):
occup = self.get_mo_occupation(mo_coeff)
# RHF
atol = self.opts.bath_options["occupation_tolerance"]
if np.ndim(occup[0]) == 0:
assert np.allclose(occup, 2 * expected, rtol=0, atol=2 * atol)
else:
assert np.allclose(occup[0], expected, rtol=0, atol=atol)
assert np.allclose(occup[1], expected, rtol=0, atol=atol)
check_occup(cluster.c_total_occ, 1)
check_occup(cluster.c_total_vir, 0)
self.log.info("Orbitals for %s", self)
self.log.info("-------------%s", len(str(self)) * "-")
self.log.info(cluster.repr_size().replace("%", "%%"))
self.cluster = cluster
return cluster
[docs] def make_bosonic_bath_target(self):
"""Get the target space for bosonic bath orbitals. This can either be the DMET cluster or the full space, and
can include a projection onto the fragment."""
if self.opts.bosonic_bath_options["bathtype"] != "rpa":
return None, 0
target_space = RPA_Boson_Target_Space(
self,
target_orbitals=self.opts.bosonic_bath_options["target_orbitals"],
local_projection=self.opts.bosonic_bath_options["local_projection"],
)
target_excits = target_space.gen_target_excitation()
return target_excits, target_excits.shape[0]
[docs] def make_bosonic_cluster(self, m0_target):
"""Set bosonic component of the cluster."""
if self.opts.bosonic_bath_options["bathtype"] != "rpa":
return
self._boson_bath_factory = RPA_QBA_Bath(self, target_m0=m0_target)
boson_threshold = Boson_Threshold(
self.opts.bosonic_bath_options["truncation"], self.opts.bosonic_bath_options["threshold"]
)
c_bath, c_frozen = self._boson_bath_factory.get_bath(boson_threshold=boson_threshold)
fullorbs = Orbitals(coeff=self.base.mo_coeff, occ=self.base.mo_occ)
self.cluster.bosons = QuasiBosonOrbitals(fullorbs, coeff_ex=c_bath)
# --- Results
# ===========
[docs] def get_fragment_mo_energy(self, c_active=None, fock=None):
"""Returns approximate MO energies, using the the diagonal of the Fock matrix.
Parameters
----------
c_active: array, optional
fock: array, optional
"""
if c_active is None:
c_active = self.cluster.c_active
if fock is None:
fock = self.base.get_fock()
mo_energy = einsum("ai,ab,bi->i", c_active, fock, c_active)
return mo_energy
[docs] @mpi.with_send(source=get_fragment_mpi_rank)
def get_fragment_dmet_energy(
self, dm1=None, dm2=None, h1e_eff=None, hamil=None, part_cumulant=True, approx_cumulant=True
):
"""Get fragment contribution to whole system DMET energy from cluster DMs.
After fragment summation, the nuclear-nuclear repulsion must be added to get the total energy!
Parameters
----------
dm1: array, optional
Cluster one-electron reduced density-matrix in cluster basis. If `None`, `self.results.dm1` is used. Default: None.
dm2: array, optional
Cluster two-electron reduced density-matrix in cluster basis. If `None`, `self.results.dm2` is used. Default: None.
hamil : ClusterHamiltonian object.
Object representing cluster hamiltonian, possibly including cached ERIs.
part_cumulant: bool, optional
If True, the 2-DM cumulant will be partitioned to calculate the energy. If False,
the full 2-DM will be partitioned, as it is done in most of the DMET literature.
True is recommended, unless checking for agreement with literature results. Default: True.
approx_cumulant: bool, optional
If True, the approximate cumulant, containing (delta 1-DM)-squared terms, is partitioned,
instead of the true cumulant, if `part_cumulant=True`. Default: True.
Returns
-------
e_dmet: float
Electronic fragment DMET energy.
"""
assert mpi.rank == self.mpi_rank
if dm1 is None:
dm1 = self.results.dm1.copy()
if dm1 is None:
raise RuntimeError("DM1 not found for %s" % self)
c_act = self.cluster.c_active
t0 = timer()
if hamil is None:
hamil = self._hamil
if hamil is None:
hamil = self.get_frag_hamil()
eris = hamil.get_eris_bare()
if dm2 is None:
dm2 = self.results.wf.make_rdm2(with_dm1=not part_cumulant, approx_cumulant=approx_cumulant)
# Get effective core potential
if h1e_eff is None:
if part_cumulant:
h1e_eff = dot(c_act.T, self.base.get_hcore_for_energy(), c_act)
else:
# Use the original Hcore (without chemical potential modifications), but updated mf-potential!
h1e_eff = self.base.get_hcore_for_energy() + self.base.get_veff_for_energy(with_exxdiv=False) / 2
h1e_eff = dot(c_act.T, h1e_eff, c_act)
occ = np.s_[: self.cluster.nocc_active]
v_act = einsum("iipq->pq", eris[occ, occ, :, :]) - einsum("iqpi->pq", eris[occ, :, :, occ]) / 2
h1e_eff -= v_act
p_frag = self.get_fragment_projector(c_act)
# Check number of electrons
ne = einsum("ix,ij,jx->", p_frag, dm1, p_frag)
self.log.debugv("Number of electrons for DMET energy in fragment %12s: %.8f", self, ne)
# Evaluate energy
e1b = einsum("xj,xi,ij->", h1e_eff, p_frag, dm1)
e2b = einsum("xjkl,xi,ijkl->", eris, p_frag, dm2) / 2
self.log.debugv("E(DMET): E(1)= %s E(2)= %s", energy_string(e1b), energy_string(e2b))
e_dmet = self.opts.sym_factor * (e1b + e2b)
self.log.debugv("Fragment E(DMET)= %+16.8f Ha", e_dmet)
self.log.timingv("Time for DMET energy: %s", time_string(timer() - t0))
return e_dmet
# --- Counterpoise
# ================
[docs] def make_counterpoise_mol(self, rmax, nimages=1, unit="A", **kwargs):
"""Make molecule object for counterposise calculation.
WARNING: This has only been tested for periodic systems so far!
Parameters
----------
rmax : float
All atom centers within range `rmax` are added as ghost-atoms in the counterpoise correction.
nimages : int, optional
Number of neighboring unit cell in each spatial direction. Has no effect in open boundary
calculations. Default: 5.
unit : ['A', 'B']
Unit for `rmax`, either Angstrom (`A`) or Bohr (`B`).
**kwargs :
Additional keyword arguments for returned PySCF Mole/Cell object.
Returns
-------
mol_cp : pyscf.gto.Mole or pyscf.pbc.gto.Cell
Mole or Cell object with periodic boundary conditions removed
and with ghost atoms added depending on `rmax` and `nimages`.
"""
if len(self.atoms) != 1:
raise NotImplementedError
import vayesta.misc
return vayesta.misc.counterpoise.make_mol(
self.mol, self.atoms[1], rmax=rmax, nimages=nimages, unit=unit, **kwargs
)
# --- Orbital plotting
# --------------------
[docs] @mpi.with_send(source=get_fragment_mpi_rank)
def pop_analysis(self, cluster=None, dm1=None, **kwargs):
if cluster is None:
cluster = self.cluster
if dm1 is None:
dm1 = self.results.dm1
if dm1 is None:
raise ValueError("DM1 not found for %s" % self)
dm1 = cluster.make_frozen_rdm1() + spinalg.dot(cluster.coeff, dm1, spinalg.T(cluster.coeff))
return self.base.pop_analysis(dm1, **kwargs)
[docs] def plot3d(self, filename, gridsize=(100, 100, 100), **kwargs):
"""Write cube density data of fragment orbitals to file."""
nx, ny, nz = gridsize
directory = os.path.dirname(filename)
if directory:
os.makedirs(directory, exist_ok=True)
cube = CubeFile(self.mol, filename=filename, nx=nx, ny=ny, nz=nz, **kwargs)
cube.add_orbital(self.c_frag)
cube.write()
[docs] def check_solver(self, solver):
is_uhf = np.ndim(self.base.mo_coeff[1]) == 2
if self.opts.screening:
is_eb = "crpa_full" in self.opts.screening
else:
is_eb = False
check_solver_config(solver, is_uhf, is_eb, self.log)
[docs] def get_solver(self, solver=None):
if solver is None:
solver = self.solver
cl_ham = self.hamil
solver_cls = get_solver_class(cl_ham, solver)
solver_opts = self.get_solver_options(solver)
cluster_solver = solver_cls(cl_ham, **solver_opts)
if self.opts.screening is not None:
cluster_solver.hamil.add_screening(self._seris_ov)
return cluster_solver
[docs] def get_local_rpa_correction(self, hamil=None):
e_loc_rpa = None
if self.base.opts.ext_rpa_correction:
hamil = hamil or self.hamil
cumulant = self.base.opts.ext_rpa_correction == "cumulant"
if self.base.opts.ext_rpa_correction not in ["erpa", "cumulant"]:
raise ValueError("Unknown external rpa correction %s specified.")
e_loc_rpa = hamil.calc_loc_erpa(*self._seris_ov[1:], cumulant)
return e_loc_rpa
[docs] def get_frag_hamil(self):
if self.opts.screening is not None:
if "crpa_full" in self.opts.screening:
self.bos_freqs, self.couplings = get_frag_W(
self.mf,
self,
pcoupling=("pcoupled" in self.opts.screening),
only_ov_screened=("ov" in self.opts.screening),
log=self.log,
)
# This detects based on fragment what kind of Hamiltonian is appropriate (restricted and/or EB).
return ClusterHamiltonian(
self,
self.mf,
self.log,
screening=self.opts.screening,
cache_eris=self.opts.store_eris,
match_fock=self.opts.match_cluster_fock,
)
[docs] def get_solver_options(self, *args, **kwargs):
raise AbstractMethodError