Source code for vayesta.ewf.ewf

# --- Standard
import dataclasses

# --- External
import numpy as np
from vayesta.core.util import (
    NotCalculatedError,
    break_into_lines,
    cache,
    deprecated,
    dot,
    einsum,
    energy_string,
    log_method,
    log_time,
    time_string,
    timer,
)
from vayesta.core.qemb import Embedding
from vayesta.core.fragmentation import SAO_Fragmentation
from vayesta.core.fragmentation import IAOPAO_Fragmentation
from vayesta.mpi import mpi
from vayesta.ewf.fragment import Fragment
from vayesta.ewf.amplitudes import get_global_t1_rhf
from vayesta.ewf.amplitudes import get_global_t2_rhf
from vayesta.ewf.rdm import make_rdm1_ccsd
from vayesta.ewf.rdm import make_rdm1_ccsd_global_wf
from vayesta.ewf.rdm import make_rdm2_ccsd_global_wf
from vayesta.ewf.rdm import make_rdm1_ccsd_proj_lambda
from vayesta.ewf.rdm import make_rdm2_ccsd_proj_lambda
from vayesta.ewf.icmp2 import get_intercluster_mp2_energy_rhf


[docs]@dataclasses.dataclass class Options(Embedding.Options): """Options for EWF calculations.""" # --- Fragment settings iao_minao: str = "auto" # Minimal basis for IAOs # --- Bath settings bath_options: dict = Embedding.Options.change_dict_defaults("bath_options", bathtype="mp2", threshold=1e-8) # ewdmet_max_order: int = 1 # If multiple bno thresholds are to be calculated, we can project integrals and amplitudes from a previous larger cluster: project_eris: bool = False # Project ERIs from a pervious larger cluster (corresponding to larger eta), can result in a loss of accuracy especially for large basis sets! project_init_guess: bool = True # Project converted T1,T2 amplitudes from a previous larger cluster energy_functional: str = "wf" # Calculation modes calc_e_wf_corr: bool = True calc_e_dm_corr: bool = False # --- Solver settings t_as_lambda: bool = None # If True, use T-amplitudes inplace of Lambda-amplitudes store_wf_type: str = None # If set, fragment WFs will be converted to the respective type, before storing them # Counterpoise correction of BSSE bsse_correction: bool = True bsse_rmax: float = 5.0 # In Angstrom nelectron_target: int = None # --- Couple embedding problems (currently only CCSD) sc_mode: int = 0 coupled_iterations: bool = False # --- Debugging _debug_wf: str = None
[docs]class EWF(Embedding): Fragment = Fragment Options = Options def __init__(self, mf, solver="CCSD", log=None, **kwargs): t0 = timer() super().__init__(mf, solver=solver, log=log, **kwargs) # Logging with self.log.indent(): # Options self.log.info("Parameters of %s:", self.__class__.__name__) self.log.info(break_into_lines(str(self.opts), newline="\n ")) self.log.info("Time for %s setup: %s", self.__class__.__name__, time_string(timer() - t0)) def __repr__(self): keys = ["mf", "solver"] fmt = ("%s(" + len(keys) * "%s: %r, ")[:-2] + ")" values = [self.__dict__[k] for k in keys] return fmt % (self.__class__.__name__, *[x for y in zip(keys, values) for x in y]) def _reset(self, **kwargs): super()._reset(**kwargs) # TODO: Redo self-consistencies self.iteration = 0 self._make_rdm1_ccsd_global_wf_cached.cache_clear() # Default fragmentation
[docs] def fragmentation(self, *args, **kwargs): return self.iao_fragmentation(*args, **kwargs)
[docs] def tailor_all_fragments(self): for x in self.fragments: for y in self.fragments: if x == y: continue x.add_tailor_fragment(y)
[docs] def kernel(self): """Run EWF.""" t_start = timer() # Automatic fragmentation if len(self.fragments) == 0: self.log.debug("No fragments found. Adding all atomic IAO fragments.") with self.fragmentation() as frag: frag.add_all_atomic_fragments() self._check_fragment_nelectron() # Debug: calculate exact WF if self.opts._debug_wf is not None: self._debug_get_wf(self.opts._debug_wf) # --- Create bath and clusters if mpi: mpi.world.Barrier() self.log.info("") self.log.info("MAKING BATH AND CLUSTERS") self.log.info("========================") fragments = self.get_fragments(active=True, sym_parent=None, mpi_rank=mpi.rank) fragdict = {f.id: f for f in fragments} with log_time(self.log.timing, "Total time for bath and clusters: %s"): for x in fragments: if x._results is not None: self.log.debug("Resetting %s" % x) x.reset() msg = "Making bath and clusters for %s%s" % (x, (" on MPI process %d" % mpi.rank) if mpi else "") self.log.info(msg) self.log.info(len(msg) * "-") with self.log.indent(): if x._dmet_bath is None: # Make own bath: if x.flags.bath_parent_fragment_id is None: x.make_bath() # Copy bath (DMET, occupied, virtual) from other fragment: else: bath_parent = fragdict[x.flags.bath_parent_fragment_id] for attr in ("_dmet_bath", "_bath_factory_occ", "_bath_factory_vir"): setattr(x, attr, getattr(bath_parent, attr)) if x._cluster is None: x.make_cluster() if mpi: mpi.world.Barrier() if mpi: with log_time(self.log.timing, "Time for MPI communication of clusters: %s"): self.communicate_clusters() # --- Screened Coulomb interaction; either static or dynamic self.build_screened_interactions() # --- Loop over fragments with no symmetry parent and with own MPI rank self.log.info("") self.log.info("RUNNING SOLVERS") self.log.info("===============") with log_time(self.log.timing, "Total time for solvers: %s"): # Split fragments in auxiliary and regular, solve auxiliary fragments first fragments_aux = [x for x in fragments if x.opts.auxiliary] fragments_reg = [x for x in fragments if not x.opts.auxiliary] for frags in [fragments_aux, fragments_reg]: for x in frags: msg = "Solving %s%s" % (x, (" on MPI process %d" % mpi.rank) if mpi else "") self.log.info(msg) self.log.info(len(msg) * "-") with self.log.indent(): x.kernel() if mpi: mpi.world.Barrier() if self.solver.lower() == "dump": self.log.output("Clusters dumped to file '%s'", self.opts.solver_options["dumpfile"]) return # --- Check convergence of fragments conv = self._all_converged(fragments) if not conv: self.log.error("Some fragments did not converge!") self.converged = conv # --- Evaluate correlation energy and log information try: self.e_corr = self.get_e_corr() except AttributeError as e: self.log.error("Could not calculate correlation energy") self.e_corr = np.nan self.log.output("E(MF)= %s", energy_string(self.e_mf)) self.log.output("E(corr)= %s", energy_string(self.e_corr)) self.log.output("E(tot)= %s", energy_string(self.e_tot)) self.log.info("Total wall time: %s", time_string(timer() - t_start)) return self.e_tot
def _all_converged(self, fragments): conv = True for fx in fragments: conv = conv and fx.results.converged if mpi: conv = mpi.world.allreduce(conv, op=mpi.MPI.LAND) return conv # --- CC Amplitudes # ----------------- # T-amplitudes get_global_t1 = get_global_t1_rhf get_global_t2 = get_global_t2_rhf # Lambda-amplitudes
[docs] def get_global_l1(self, *args, t_as_lambda=None, **kwargs): get_lambda = True if not t_as_lambda else False return self.get_global_t1(*args, get_lambda=get_lambda, **kwargs)
[docs] def get_global_l2(self, *args, t_as_lambda=None, **kwargs): get_lambda = True if not t_as_lambda else False return self.get_global_t2(*args, get_lambda=get_lambda, **kwargs)
[docs] def t1_diagnostic(self, warntol=0.02): # Per cluster for fx in self.get_fragments(active=True, mpi_rank=mpi.rank): wfx = fx.results.wf.as_ccsd() t1 = wfx.t1 nelec = 2 * t1.shape[0] t1diag = np.linalg.norm(t1) / np.sqrt(nelec) if t1diag >= warntol: self.log.warning("T1 diagnostic for %-20s %.5f", str(fx) + ":", t1diag) else: self.log.info("T1 diagnostic for %-20s %.5f", str(fx) + ":", t1diag) # Global t1 = self.get_global_t1(mpi_target=0) if mpi.is_master: nelec = 2 * t1.shape[0] t1diag = np.linalg.norm(t1) / np.sqrt(nelec) if t1diag >= warntol: self.log.warning("Global T1 diagnostic: %.5f", t1diag) else: self.log.info("Global T1 diagnostic: %.5f", t1diag)
# --- Density-matrices # -------------------- # Defaults
[docs] def make_rdm1(self, *args, **kwargs): if "cc" in self.solver.lower(): return self._make_rdm1_ccsd_global_wf(*args, **kwargs) if self.solver.lower() == "mp2": return self._make_rdm1_mp2_global_wf(*args, **kwargs) if self.solver.lower() == "callback": try: return self._make_rdm1_ccsd_global_wf(*args, **kwargs) except AttributeError: return self.make_rdm1_demo(*args, **kwargs) if self.solver.lower() == "fci": return self.make_rdm1_demo(*args, **kwargs) raise NotImplementedError("make_rdm1 for solver '%s'" % self.solver)
[docs] def make_rdm2(self, *args, **kwargs): if self.solver.lower() == "ccsd": return self._make_rdm2_ccsd_proj_lambda(*args, **kwargs) # return self._make_rdm2_ccsd(*args, **kwargs) if self.solver.lower() == "mp2": return self._make_rdm2_ccsd_proj_lambda(*args, t_as_lambda=True, **kwargs) raise NotImplementedError("make_rdm2 for solver '%s'" % self.solver)
# DM1 @log_method() def _make_rdm1_mp2(self, *args, **kwargs): return make_rdm1_ccsd(self, *args, mp2=True, **kwargs) @log_method() def _make_rdm1_ccsd(self, *args, **kwargs): return make_rdm1_ccsd(self, *args, mp2=False, **kwargs) @log_method() def _make_rdm1_ccsd_global_wf(self, *args, ao_basis=False, with_mf=True, **kwargs): dm1 = self._make_rdm1_ccsd_global_wf_cached(*args, **kwargs) if with_mf: dm1[np.diag_indices(self.nocc)] += 2 if ao_basis: dm1 = dot(self.mo_coeff, dm1, self.mo_coeff.T) return dm1 @cache(copy=True) def _make_rdm1_ccsd_global_wf_cached(self, *args, **kwargs): return make_rdm1_ccsd_global_wf(self, *args, **kwargs) def _make_rdm1_mp2_global_wf(self, *args, **kwargs): return self._make_rdm1_ccsd_global_wf(*args, t_as_lambda=True, with_t1=False, **kwargs) @log_method() def _make_rdm1_ccsd_proj_lambda(self, *args, **kwargs): return make_rdm1_ccsd_proj_lambda(self, *args, **kwargs) # DM2 @log_method() def _make_rdm2_ccsd_global_wf(self, *args, **kwargs): return make_rdm2_ccsd_global_wf(self, *args, **kwargs) @log_method() def _make_rdm2_ccsd_proj_lambda(self, *args, **kwargs): return make_rdm2_ccsd_proj_lambda(self, *args, **kwargs) # --- Energy # ---------- # Correlation
[docs] def get_e_corr(self, functional=None, **kwargs): functional = functional or self.opts.energy_functional if functional == "projected": self.log.warning("functional='projected' is deprecated; use functional='wf' instead.") functional = "wf" if functional == "wf": # CCSD projected energy expression return self.get_wf_corr_energy(**kwargs) if functional == "dm-t2only": # Builds density matrices from projected amplitudes return self.get_dm_corr_energy(t_as_lambda=True, **kwargs) if functional == "dm": # Builds density matrices from projected amplitudes return self.get_dm_corr_energy(**kwargs) if functional == 'dmet': # Uses projected density matrices (democratic partitioning) return self.get_dmet_energy(**kwargs) - self.e_mf raise ValueError("Unknown energy functional: '%s'" % functional)
[docs] @mpi.with_allreduce() def get_wf_corr_energy(self): e_corr = 0.0 # Only loop over fragments of own MPI rank for x in self.get_fragments(contributes=True, sym_parent=None, mpi_rank=mpi.rank): if x.results.e_corr is not None: ex = x.results.e_corr else: wf = x.results.wf.as_cisd(c0=1.0) px = x.get_overlap("frag|cluster-occ") wf = wf.project(px) es, ed, ex = x.get_fragment_energy(wf.c1, wf.c2) self.log.debug( "%20s: E(S)= %s E(D)= %s E(tot)= %s", x, energy_string(es), energy_string(ed), energy_string(ex) ) e_corr += x.symmetry_factor * ex return e_corr / self.ncells
[docs] def get_dm_corr_energy(self, dm1="global-wf", dm2="projected-lambda", t_as_lambda=None, with_exxdiv=None): e1 = self.get_dm_corr_energy_e1(dm1=dm1, t_as_lambda=None, with_exxdiv=None) e2 = self.get_dm_corr_energy_e2(dm2=dm2, t_as_lambda=t_as_lambda) e_corr = e1 + e2 self.log.debug("Ecorr(1)= %s Ecorr(2)= %s Ecorr= %s", *map(energy_string, (e1, e2, e_corr))) return e_corr
[docs] def get_dm_corr_energy_e1(self, dm1="global-wf", t_as_lambda=None, with_exxdiv=None): # Correlation energy due to changes in 1-DM and non-cumulant 2-DM: if dm1 == "global-wf": dm1 = self._make_rdm1_ccsd_global_wf(with_mf=False, t_as_lambda=t_as_lambda, ao_basis=True) elif dm1 == "2p1l": dm1 = self._make_rdm1_ccsd(with_mf=False, t_as_lambda=t_as_lambda, ao_basis=True) elif dm1 == "1p1l": dm1 = self._make_rdm1_ccsd_1p1l(with_mf=False, t_as_lambda=t_as_lambda, ao_basis=True) else: raise ValueError if with_exxdiv is None: if self.has_exxdiv: with_exxdiv = np.all([x.solver == "MP2" for x in self.fragments]) any_mp2 = np.any([x.solver == "MP2" for x in self.fragments]) any_not_mp2 = np.any([x.solver != "MP2" for x in self.fragments]) if any_mp2 and any_not_mp2: self.log.warning("Both MP2 and not MP2 solvers detected - unclear usage of exxdiv!") else: with_exxdiv = False fock = self.get_fock_for_energy(with_exxdiv=with_exxdiv) e1 = np.sum(fock * dm1) return e1 / self.ncells
[docs] @mpi.with_allreduce() def get_dm_corr_energy_e2(self, dm2="projected-lambda", t_as_lambda=None): """Correlation energy due to cumulant""" if t_as_lambda is None: t_as_lambda = self.opts.t_as_lambda if dm2 == "global-wf": dm2 = self._make_rdm2_ccsd_global_wf(t_as_lambda=t_as_lambda, with_dm1=False) # TODO: AO basis, late DF contraction if self.spinsym == "restricted": g = self.get_eris_array(self.mo_coeff) e2 = einsum("pqrs,pqrs", g, dm2) / 2 else: dm2aa, dm2ab, dm2bb = dm2 gaa, gab, gbb = self.get_eris_array_uhf(self.mo_coeff) e2 = ( einsum("pqrs,pqrs", gaa, dm2aa) / 2 + einsum("pqrs,pqrs", gbb, dm2bb) / 2 + einsum("pqrs,pqrs", gab, dm2ab) ) elif dm2 == "projected-lambda": e2 = 0.0 for x in self.get_fragments(contributes=True, sym_parent=None, mpi_rank=mpi.rank): ex = x.results.e_corr_dm2cumulant if ex is None or (t_as_lambda is not None and (t_as_lambda != x.opts.t_as_lambda)): ex = x.make_fragment_dm2cumulant_energy(t_as_lambda=t_as_lambda) e2 += x.symmetry_factor * x.sym_factor * ex else: raise ValueError("Unknown value for dm2: '%s'" % dm2) return e2 / self.ncells
[docs] def get_ccsd_corr_energy(self, full_wf=False): """Get projected correlation energy from partitioned CCSD WF. This is the projected (T1, T2) energy expression, instead of the projected (C1, C2) expression used in PRX (the differences are very small). For testing only, UHF and MPI not implemented""" t0 = timer() t1 = self.get_global_t1() # E(singles) fock = self.get_fock_for_energy(with_exxdiv=False) fov = dot(self.mo_coeff_occ.T, fock, self.mo_coeff_vir) e_singles = 2 * np.sum(fov * t1) # E(doubles) if full_wf: c2 = self.get_global_t2() + einsum("ia,jb->ijab", t1, t1) mos = (self.mo_coeff_occ, self.mo_coeff_vir, self.mo_coeff_vir, self.mo_coeff_occ) eris = self.get_eris_array(mos) e_doubles = 2 * einsum("ijab,iabj", c2, eris) - einsum("ijab,ibaj", c2, eris) else: e_doubles = 0.0 for x in self.get_fragments(contributes=True, sym_parent=None, mpi_rank=mpi.rank): pwf = x.results.pwf.as_ccsd() ro = x.get_overlap("mo-occ|cluster-occ") rv = x.get_overlap("mo-vir|cluster-vir") t1x = dot(ro.T, t1, rv) # N(frag) * N^2 c2x = pwf.t2 + einsum("ia,jb->ijab", pwf.t1, t1x) noccx = x.cluster.nocc_active nvirx = x.cluster.nvir_active eris = x.hamil.get_eris_bare("ovvo") px = x.get_overlap("frag|cluster-occ") eris = einsum("xi,iabj->xabj", px, eris) wx = x.symmetry_factor * x.sym_factor e_doubles += wx * (2 * einsum("ijab,iabj", c2x, eris) - einsum("ijab,ibaj", c2x, eris)) if mpi: e_doubles = mpi.world.allreduce(e_doubles) self.log.timing("Time for E(CCSD)= %s", time_string(timer() - t0)) e_corr = e_singles + e_doubles return e_corr / self.ncells
# Total energy @property def e_tot(self): """Total energy.""" return self.e_mf + self.e_corr
[docs] def get_wf_energy(self, *args, **kwargs): e_corr = self.get_wf_corr_energy(*args, **kwargs) return self.e_mf + e_corr
[docs] def get_dm_energy(self, *args, **kwargs): e_corr = self.get_dm_corr_energy(*args, **kwargs) return self.e_mf + e_corr
[docs] def get_ccsd_energy(self, full_wf=False): return self.e_mf + self.get_ccsd_corr_energy(full_wf=full_wf)
# --- Energy corrections
[docs] @mpi.with_allreduce() @log_method() def get_fbc_energy(self, occupied=True, virtual=True): """Get finite-bath correction (FBC) energy. This correction consists of two independent contributions, one due to the finite occupied, and one due to the finite virtual space. The virtual correction for a given fragment x is calculated as "E(MP2)[occ=D,vir=F] - E(MP2)[occ=D,vir=C]", where D is the DMET cluster space, F is the full space, and C is the full cluster space. For the occupied correction, occ and vir spaces are swapped. Fragments which do not have a BNO bath are skipped. Parameters ---------- occupied: bool, optional If True, the FBC energy from the occupied space is included. Default: True. virtual: bool, optional If True, the FBC energy from the virtual space is included. Default: True. Returns ------- e_fbc: float Finite bath correction (FBC) energy. """ if not (occupied or virtual): raise ValueError e_fbc = 0.0 # Only loop over fragments of own MPI rank for fx in self.get_fragments(contributes=True, sym_parent=None, flags=dict(is_envelop=True), mpi_rank=mpi.rank): ex = 0 if occupied: get_fbc = getattr(fx._bath_factory_occ, "get_finite_bath_correction", False) if get_fbc: ex += get_fbc(fx.cluster.c_active_occ, fx.cluster.c_frozen_occ) else: self.log.warning("%s does not have occupied BNOs - skipping fragment for FBC energy.", fx) if virtual: get_fbc = getattr(fx._bath_factory_vir, "get_finite_bath_correction", False) if get_fbc: ex += get_fbc(fx.cluster.c_active_vir, fx.cluster.c_frozen_vir) else: self.log.warning("%s does not have virtual BNOs - skipping fragment for FBC energy.", fx) self.log.debug("FBC from %-30s dE= %s", fx, energy_string(ex)) e_fbc += fx.symmetry_factor * ex e_fbc /= self.ncells self.log.debug("E(FBC)= %s", energy_string(e_fbc)) return e_fbc
[docs] @log_method() def get_intercluster_mp2_energy(self, *args, **kwargs): return get_intercluster_mp2_energy_rhf(self, *args, **kwargs)
def _get_dm_energy_old(self, global_dm1=True, global_dm2=False): """Calculate total energy from reduced density-matrices. RHF ONLY! Parameters ---------- global_dm1 : bool Use 1DM calculated from global amplitutes if True, otherwise use in cluster approximation. Default: True global_dm2 : bool Use 2DM calculated from global amplitutes if True, otherwise use in cluster approximation. Default: False Returns ------- e_tot : float """ return self.e_mf + self._get_dm_corr_energy_old(global_dm1=global_dm1, global_dm2=global_dm2) def _get_dm_corr_energy_old(self, global_dm1=True, global_dm2=False): """Calculate correlation energy from reduced density-matrices. RHF ONLY! Parameters ---------- global_dm1 : bool Use 1DM calculated from global amplitutes if True, otherwise use in cluster approximation. Default: True global_dm2 : bool Use 2DM calculated from global amplitutes if True, otherwise use in cluster approximation. Default: False Returns ------- e_corr : float """ t_as_lambda = self.opts.t_as_lambda if global_dm1: dm1 = self._make_rdm1_ccsd_global_wf(t_as_lambda=t_as_lambda, ao_basis=True, with_mf=False) else: dm1 = self._make_rdm1_ccsd(t_as_lambda=t_as_lambda, ao_basis=True, with_mf=False) # Core Hamiltonian + Non Cumulant 2DM contribution e1 = np.sum(self.get_fock_for_energy(with_exxdiv=False) * dm1) # Cumulant 2DM contribution if global_dm2: # Calculate global 2RDM and contract with ERIs rdm2 = self._make_rdm2_ccsd_global_wf(t_as_lambda=t_as_lambda, with_dm1=False) eris = self.get_eris_array(self.mo_coeff) e2 = einsum("pqrs,pqrs", eris, rdm2) / 2 else: # Fragment Local 2DM cumulant contribution e2 = self.get_dm_corr_energy_e2(t_as_lambda=t_as_lambda) * self.ncells e_corr = (e1 + e2) / self.ncells return e_corr # --- Debugging def _debug_get_wf(self, kind): if kind == "random": return if kind == "exact": if self.solver == "CCSD": import pyscf import pyscf.cc from vayesta.core.types import WaveFunction cc = pyscf.cc.CCSD(self.mf) cc.kernel() if self.opts.solver_options["solve_lambda"]: cc.solve_lambda() wf = WaveFunction.from_pyscf(cc) else: raise NotImplementedError else: wf = self.opts._debug_wf self._debug_wf = wf
REWF = EWF