Source code for vayesta.core.qemb.qemb

import logging
from datetime import datetime
import dataclasses
import copy
import itertools
import os
import os.path
from typing import Optional

import numpy as np

import pyscf
import pyscf.mp
import pyscf.ci
import pyscf.cc
import pyscf.pbc
import pyscf.pbc.tools
import pyscf.lib

from pyscf.mp.mp2 import _mo_without_core
from vayesta.core.foldscf import FoldedSCF, fold_scf
from vayesta.core.util import (
    OptionsBase,
    OrthonormalityError,
    SymmetryError,
    dot,
    einsum,
    energy_string,
    getattr_recursive,
    hstack,
    log_method,
    log_time,
    with_doc,
)
from vayesta.core import spinalg, eris
from vayesta.core.scmf import PDMET, Brueckner
from vayesta.core.screening.screening_moment import build_screened_eris
from vayesta.mpi import mpi
from vayesta.core.qemb.register import FragmentRegister
from vayesta.rpa import ssRIRPA
from vayesta.solver import check_solver_config

# Symmetry
from vayesta.core.symmetry import SymmetryGroup
from vayesta.core.symmetry import SymmetryInversion
from vayesta.core.symmetry import SymmetryReflection
from vayesta.core.symmetry import SymmetryRotation
from vayesta.core.symmetry import SymmetryTranslation

# Fragmentations
from vayesta.core.fragmentation import SAO_Fragmentation
from vayesta.core.fragmentation import IAO_Fragmentation
from vayesta.core.fragmentation import IAOPAO_Fragmentation
from vayesta.core.fragmentation import Site_Fragmentation
from vayesta.core.fragmentation import CAS_Fragmentation

from vayesta.misc.cptbisect import ChempotBisection

# Expectation values
from vayesta.core.qemb.corrfunc import get_corrfunc
from vayesta.core.qemb.corrfunc import get_corrfunc_mf

# --- This Package

from vayesta.core.qemb.fragment import Fragment

# from . import helper
from vayesta.core.qemb.rdm import make_rdm1_demo_rhf
from vayesta.core.qemb.rdm import make_rdm2_demo_rhf


[docs]@dataclasses.dataclass class Options(OptionsBase): store_eris: bool = ( True # If True, ERIs will be stored in Fragment.hamil; otherwise they will be recalculated whenever needed. ) global_frag_chempot: float = None # Global fragment chemical potential (e.g. for democratically partitioned DMs) dm_with_frozen: bool = False # Add frozen parts to cluster DMs # --- Bath options bath_options: dict = OptionsBase.dict_with_defaults( # General bathtype="dmet", canonicalize=True, occupation_tolerance=1e-6, # DMET bath dmet_threshold=1e-8, # R2 bath rcut=None, unit="Ang", # EwDMET bath order=None, max_order=20, # +threshold (same as MP2 bath) # MP2 bath threshold=None, truncation="occupation", project_dmet_order=2, project_dmet_mode="squared-entropy", addbuffer=False, # The following options can be set occupied/virtual-specific: bathtype_occ=None, bathtype_vir=None, rcut_occ=None, rcut_vir=None, unit_occ=None, unit_vir=None, order_occ=None, order_vir=None, max_order_occ=None, max_order_vir=None, threshold_occ=None, threshold_vir=None, truncation_occ=None, truncation_vir=None, project_dmet_order_occ=None, project_dmet_order_vir=None, project_dmet_mode_occ=None, project_dmet_mode_vir=None, addbuffer_occ=None, addbuffer_dmet_vir=None, canonicalize_occ=None, canonicalize_vir=None, ) # --- Bosonic bath options bosonic_bath_options: dict = OptionsBase.dict_with_defaults( # General bathtype=None, # construction options. target_orbitals="full", local_projection="fragment", # bath truncation options. threshold=1e-6, truncation="occupation", ) # --- Solver options solver_options: dict = OptionsBase.dict_with_defaults( # General conv_tol=None, n_moments=None, # CCSD solve_lambda=True, conv_tol_normt=None, level_shift=None, diis_space=None, diis_start_cycle=None, iterative_damping=None, # FCI threads=1, max_cycle=300, fix_spin=None, lindep=None, davidson_only=True, init_guess="default", # EBFCI/EBCCSD max_boson_occ=2, # EBCC ansatz=None, store_as_ccsd=None, # Dump dumpfile="clusters.h5", # Callback callback = None, # MP2 compress_cderi=False, ) # --- Other symmetry_tol: float = 1e-6 # Tolerance (in Bohr) for atomic positions symmetry_mf_tol: float = 1e-5 # Tolerance for mean-field solution screening: Optional[str] = None # What form of screening to use in clusters. ext_rpa_correction: Optional[str] = None match_cluster_fock: bool = False
[docs]class Embedding: """Base class for quantum embedding methods. Parameters ---------- mf : pyscf.scf.SCF PySCF mean-field object. solver : str, optional Default solver for cluster problems. The available solvers depend on the embedding class. Default: 'CCSD'. log : logging.Logger, optional Vayesta logger object. Default: None bath_options : dict, optional Bath specific options. The bath type is determined by the key `bathtype` (default: 'DMET'). The following bath specific options can be specified. All bath types: dmet_threshold : float, optional Threshold for DMET bath orbitals. Orbitals with eigenvalues larger than `dmet_threshold` or smaller than 1-`dmet_threshold` will be added as bath orbitals. Default: 1e-6. MP2 bath (`bathtype = 'MP2'`): threshold : float Threshold for MP2 natural orbital truncation. Orbitals with eigenvalues larger than `threshold` will be added as bath orbitals. R2 bath (`bathtype = 'R2'`): rcut : float Range cutoff for R2 bath. Orbitals with eigenvalues smaller than `rcut` will be added as bath orbitals. unit : {'Ang', 'Bohr'}, optional Unit of `rcut`. Default: 'Ang'. solver_options : dict, optional Solver specific options. The following solver specific options can be specified. conv_tol : float Energy convergence tolerance [valid for 'CISD', 'CCSD', 'TCCSD', 'FCI'] conv_tol_normt : float Amplitude convergence tolerance [valid for 'CCSD', 'TCCSD'] fix_spin : float Target specified spin state [valid for 'FCI'] solve_lambda : bool Solve Lambda-equations [valid for 'CCSD', 'TCCSD']. If False, T-amplitudes are used instead. dumpfile : str Dump cluster orbitals and integrals to file [valid for 'Dump'] Attributes ---------- mol nao ncells nmo nfrag e_mf log : logging.Logger Logger object. self.mf : pyscf.scf.SCF PySCF mean-field object. self.mo_energy : (nMO) array MO energies. self.mo_occ : (nMO) array MO occupation numbers. self.mo_coeff : (nAO, nMO) array MO coefficients. self.fragments : list List of fragments for embedding calculation. self.kcell : pyscf.pbc.gto.Cell For k-point sampled mean-field calculation, which have been folded to the supercell, this will hold the original primitive unit cell. self.kpts : (nK, 3) array For k-point sampled mean-field calculation, which have been folded to the supercell, this will hold the original k-points. self.kdf : pyscf.pbc.df.GDF For k-point sampled mean-field calculation, which have been folded to the supercell, this will hold the original Gaussian density-fitting object. """ # Shadow these in inherited methods: Fragment = Fragment Options = Options # Deprecated: is_rhf = True is_uhf = False # Use instead: spinsym = "restricted" def __init__(self, mf, solver="CCSD", log=None, overwrite=None, **kwargs): # 1) Logging # ---------- self.log = log or logging.getLogger(__name__) self.log.info("") self.log.info("INITIALIZING %s" % self.__class__.__name__.upper()) self.log.info("=============%s" % (len(str(self.__class__.__name__)) * "=")) with self.log.indent(): # 2) Options # ---------- self.opts = self.Options().replace(**kwargs) # 3) Overwrite methods/attributes # ------------------------------- if overwrite is not None: for name, attr in overwrite.items(): if callable(attr): self.log.info("Overwriting method %s of %s", name, self.__class__.__name__) setattr(self, name, attr.__get__(self)) else: self.log.info("Overwriting attribute %s of %s", name, self.__class__.__name__) setattr(self, name, attr) # 4) Mean-field # ------------- self.mf = None self.kcell = None self.kpts = None self.kdf = None self.madelung = None with log_time(self.log.timing, "Time for mean-field setup: %s"): self.init_mf(mf) # 5) Other # -------- self.check_solver(solver) self.solver = solver self.symmetry = SymmetryGroup(self.mol, xtol=self.opts.symmetry_tol) nimages = getattr(self.mf, "subcellmesh", None) if nimages: self.symmetry.set_translations(nimages) # Rotations need to be added manually! self.register = FragmentRegister() self.fragments = [] self.with_scmf = None # Self-consistent mean-field # Initialize results self._reset() def _mpi_bcast_mf(self, mf): """Use mo_energy and mo_coeff from master MPI rank only.""" # If vayesta.misc.scf_with_mpi was used, we do not need broadcast # as the MO coefficients will already be the same if getattr(mf, "with_mpi", False): return with log_time(self.log.timing, "Time to broadcast mean-field to all MPI ranks: %s"): # Check if all MPI ranks have the same mean-field MOs # mo_energy = mpi.world.gather(mf.mo_energy) # if mpi.is_master: # moerr = np.max([abs(mo_energy[i] - mo_energy[0]).max() for i in range(len(mpi))]) # if moerr > 1e-6: # self.log.warning("Large difference of MO energies between MPI ranks= %.2e !", moerr) # else: # self.log.debugv("Largest difference of MO energies between MPI ranks= %.2e", moerr) # Use MOs of master process mf.mo_energy = mpi.world.bcast(mf.mo_energy, root=0) mf.mo_coeff = mpi.world.bcast(mf.mo_coeff, root=0)
[docs] def init_mf(self, mf): self._mf_orig = ( mf # Keep track of original mean-field object - be careful not to modify in any way, to avoid side effects! ) # Create shallow copy of mean-field object; this way it can be updated without side effects outside the quantum # embedding method if attributes are replaced in their entirety # (eg. `mf.mo_coeff = mo_new` instead of `mf.mo_coeff[:] = mo_new`). mf = copy.copy(mf) self.log.debugv("type(mf)= %r", type(mf)) # If the mean-field has k-points, automatically fold to the supercell: if isinstance(mf, pyscf.pbc.scf.khf.KSCF): with log_time(self.log.timing, "Time for k->G folding of MOs: %s"): mf = fold_scf(mf) if isinstance(mf, FoldedSCF): self.kcell, self.kpts, self.kdf = mf.kmf.mol, mf.kmf.kpts, mf.kmf.with_df # Make sure that all MPI ranks use the same MOs`: if mpi: self._mpi_bcast_mf(mf) self.mf = mf if not (self.is_rhf or self.is_uhf): raise ValueError("Cannot deduce RHF or UHF!") # Evaluating the Madelung constant is expensive - cache result if self.has_exxdiv: self.madelung = pyscf.pbc.tools.madelung(self.mol, self.mf.kpt) # Original mean-field integrals - do not change these! self._ovlp_orig = self.mf.get_ovlp() self._hcore_orig = self.mf.get_hcore() self._veff_orig = self.mf.get_veff() # Cached integrals - these can be changed! self._ovlp = self._ovlp_orig self._hcore = self._hcore_orig self._veff = self._veff_orig # Hartree-Fock energy - this can be different from mf.e_tot, when the mean-field # is not a (converged) HF calculations e_mf = mf.e_tot / self.ncells e_hf = self.e_mf de = e_mf - e_hf rde = de / e_mf if not self.mf.converged: self.log.warning("Mean-field not converged!") self.log.info("Initial E(mean-field)= %s", energy_string(e_mf)) self.log.info("Calculated E(HF)= %s", energy_string(e_hf)) self.log.info("Difference dE= %s ( %.1f%%)", energy_string(de), rde) if (abs(de) > 1e-3) or (abs(rde) > 1e-6): self.log.warning("Large difference between initial E(mean-field) and calculated E(HF)!") # FIXME (no RHF/UHF dependent code here) if self.is_rhf: self.log.info("n(AO)= %4d n(MO)= %4d n(linear dep.)= %4d", self.nao, self.nmo, self.nao - self.nmo) else: self.log.info( "n(AO)= %4d n(alpha/beta-MO)= (%4d, %4d) n(linear dep.)= (%4d, %4d)", self.nao, *self.nmo, self.nao - self.nmo[0], self.nao - self.nmo[1], ) self._check_orthonormal(self.mo_coeff, mo_name="MO") if self.mo_energy is not None: if self.is_rhf: self.log.debugv("MO energies (occ):\n%r", self.mo_energy[self.mo_occ > 0]) self.log.debugv("MO energies (vir):\n%r", self.mo_energy[self.mo_occ == 0]) else: self.log.debugv("alpha-MO energies (occ):\n%r", self.mo_energy[0][self.mo_occ[0] > 0]) self.log.debugv("beta-MO energies (occ):\n%r", self.mo_energy[1][self.mo_occ[1] > 0]) self.log.debugv("alpha-MO energies (vir):\n%r", self.mo_energy[0][self.mo_occ[0] == 0]) self.log.debugv("beta-MO energies (vir):\n%r", self.mo_energy[1][self.mo_occ[1] == 0])
[docs] def change_options(self, **kwargs): self.opts.replace(**kwargs) for fx in self.fragments: fkwds = {key: kwargs[key] for key in [key for key in kwargs if hasattr(fx.opts, key)]} fx.change_options(**fkwds)
# --- Basic properties and methods # ================================ def __repr__(self): keys = ["mf"] fmt = ("%s(" + len(keys) * "%s: %r, ")[:-2] + ")" values = [self.__dict__[k] for k in keys] return fmt % (self.__class__.__name__, *[x for y in zip(keys, values) for x in y]) # Mol/Cell properties @property def mol(self): """Mole or Cell object.""" return self.mf.mol @property def has_exxdiv(self): """Correction for divergent exact-exchange potential.""" return hasattr(self.mf, "exxdiv") and self.mf.exxdiv is not None
[docs] def get_exxdiv(self): """Get divergent exact-exchange (exxdiv) energy correction and potential. Returns ------- e_exxdiv: float Divergent exact-exchange energy correction per unit cell. v_exxdiv: array Divergent exact-exchange potential correction in AO basis. """ if not self.has_exxdiv: return 0, None sc = np.dot(self.get_ovlp(), self.mo_coeff[:, : self.nocc]) e_exxdiv = -self.madelung * self.nocc v_exxdiv = -self.madelung * np.dot(sc, sc.T) self.log.debugv("Divergent exact-exchange (exxdiv) correction= %+16.8f Ha", e_exxdiv) return e_exxdiv / self.ncells, v_exxdiv
@property def pbc_dimension(self): return getattr(self.mol, "dimension", 0) @property def nao(self): """Number of atomic orbitals.""" return self.mol.nao_nr() @property def ncells(self): """Number of primitive cells within supercell.""" if self.kpts is None: return 1 return len(self.kpts) @property def has_df(self): return (self.df is not None) or (self.kdf is not None) @property def df(self): if hasattr(self.mf, "with_df") and self.mf.with_df is not None: return self.mf.with_df return None # Mean-field properties # def init_vhf_ehf(self): # """Get Hartree-Fock potential and energy.""" # if self.opts.recalc_vhf: # self.log.debug("Calculating HF potential from mean-field object.") # vhf = self.mf.get_veff() # else: # self.log.debug("Calculating HF potential from MOs.") # cs = np.dot(self.mo_coeff.T, self.get_ovlp()) # fock = np.dot(cs.T*self.mo_energy, cs) # vhf = (fock - self.get_hcore()) # h1e = self.get_hcore_for_energy() # ehf = self.mf.energy_tot(h1e=h1e, vhf=vhf) # return vhf, ehf @property def mo_energy(self): """Molecular orbital energies.""" return self.mf.mo_energy @property def mo_coeff(self): """Molecular orbital coefficients.""" return self.mf.mo_coeff @property def mo_occ(self): """Molecular orbital occupations.""" return self.mf.mo_occ # MOs setters: # @mo_energy.setter # def mo_energy(self, mo_energy): # """Updating the MOs resets the effective potential cache `_veff`.""" # self.log.debugv("MF attribute 'mo_energy' is updated; deleting cached _veff.") # #self._veff = None # self.mf.mo_energy = mo_energy # @mo_coeff.setter # def mo_coeff(self, mo_coeff): # """Updating the MOs resets the effective potential cache `_veff`.""" # self.log.debugv("MF attribute 'mo_coeff' is updated; deleting chached _veff.") # #self._veff = None # self.mf.mo_coeff = mo_coeff # @mo_occ.setter # def mo_occ(self, mo_occ): # """Updating the MOs resets the effective potential cache `_veff`.""" # self.log.debugv("MF attribute 'mo_occ' is updated; deleting chached _veff.") # #self._veff = None # self.mf.mo_occ = mo_occ @property def nmo(self): """Total number of molecular orbitals (MOs).""" return self.mo_coeff.shape[-1] @property def nocc(self): """Number of occupied MOs.""" return np.count_nonzero(self.mo_occ > 0) @property def nvir(self): """Number of virtual MOs.""" return np.count_nonzero(self.mo_occ == 0) @property def mo_energy_occ(self): """Occupied MO energies.""" return self.mo_energy[: self.nocc] @property def mo_energy_vir(self): """Virtual MO coefficients.""" return self.mo_energy[self.nocc :] @property def mo_coeff_occ(self): """Occupied MO coefficients.""" return self.mo_coeff[:, : self.nocc] @property def mo_coeff_vir(self): """Virtual MO coefficients.""" return self.mo_coeff[:, self.nocc :] @property def e_mf(self): """Total mean-field energy per unit cell (not folded supercell). Note that the input unit cell itself can be a supercell, in which case `e_mf` refers to this cell. """ h1e = self.get_hcore_for_energy() vhf = self.get_veff_for_energy() e_mf = self.mf.energy_tot(h1e=h1e, vhf=vhf) return e_mf / self.ncells @property def e_nuc(self): """Nuclear-repulsion energy per unit cell (not folded supercell).""" return self.mol.energy_nuc() / self.ncells @property def e_nonlocal(self): if self.opts.ext_rpa_correction is None: return 0.0 e_local = sum([x.results.e_corr_rpa * x.symmetry_factor for x in self.get_fragments(sym_parent=None)]) return self.e_rpa - (e_local / self.ncells) # Embedding properties @property def nfrag(self): """Number of fragments.""" return len(self.fragments)
[docs] def loop(self): """Loop over fragments.""" for frag in self.fragments: yield frag
# --- Integral methods # ==================== # Integrals of the original mean-field object - these cannot be changed: def _get_ovlp_orig(self): return self._ovlp_orig def _get_hcore_orig(self): return self._hcore_orig def _get_veff_orig(self, with_exxdiv=True): if not with_exxdiv and self.has_exxdiv: v_exxdiv = self.get_exxdiv()[1] return self._get_veff_orig() - v_exxdiv return self._veff_orig def _get_fock_orig(self, with_exxdiv=True): return self._get_hcore_orig() + self._get_veff_orig(with_exxdiv=with_exxdiv) # Integrals which change with mean-field updates or chemical potential shifts:
[docs] def get_ovlp(self): """AO-overlap matrix.""" return self._ovlp
[docs] def get_hcore(self): """Core Hamiltonian (kinetic energy plus nuclear-electron attraction).""" return self._hcore
[docs] def get_veff(self, dm1=None, with_exxdiv=True): """Hartree-Fock Coulomb and exchange potential in AO basis.""" if not with_exxdiv and self.has_exxdiv: v_exxdiv = self.get_exxdiv()[1] return self.get_veff(dm1=dm1) - v_exxdiv if dm1 is None: return self._veff return self.mf.get_veff(dm=dm1)
[docs] def get_fock(self, dm1=None, with_exxdiv=True): """Fock matrix in AO basis.""" return self.get_hcore() + self.get_veff(dm1=dm1, with_exxdiv=with_exxdiv)
[docs] def set_ovlp(self, value): self.log.debug("Changing ovlp matrix.") self._ovlp = value
[docs] def set_hcore(self, value): self.log.debug("Changing hcore matrix.") self._hcore = value
[docs] def set_veff(self, value): self.log.debug("Changing veff matrix.") self._veff = value
# Integrals for energy evaluation # Overwriting these allows using different integrals for the energy evaluation
[docs] def get_hcore_for_energy(self): """Core Hamiltonian used for energy evaluation.""" return self.get_hcore()
[docs] def get_veff_for_energy(self, dm1=None, with_exxdiv=True): """Hartree-Fock potential used for energy evaluation.""" return self.get_veff(dm1=dm1, with_exxdiv=with_exxdiv)
[docs] def get_fock_for_energy(self, dm1=None, with_exxdiv=True): """Fock matrix used for energy evaluation.""" return self.get_hcore_for_energy() + self.get_veff_for_energy(dm1=dm1, with_exxdiv=with_exxdiv)
[docs] def get_fock_for_bath(self, dm1=None, with_exxdiv=True): """Fock matrix used for bath orbitals.""" return self.get_fock(dm1=dm1, with_exxdiv=with_exxdiv)
# Other integral methods:
[docs] def get_ovlp_power(self, power): """get power of AO overlap matrix. For folded calculations, this uses the k-point sampled overlap, for better performance and accuracy. Parameters ---------- power : float Matrix power. Returns ------- spow : (n(AO), n(AO)) array Matrix power of AO overlap matrix """ if power == 1: return self.get_ovlp() if self.kcell is None: e, v = np.linalg.eigh(self.get_ovlp()) return np.dot(v * (e**power), v.T.conj()) sk = self.kcell.pbc_intor("int1e_ovlp", hermi=1, kpts=self.kpts, pbcopt=pyscf.lib.c_null_ptr()) ek, vk = np.linalg.eigh(sk) spowk = einsum("kai,ki,kbi->kab", vk, ek**power, vk.conj()) spow = pyscf.pbc.tools.k2gamma.to_supercell_ao_integrals(self.kcell, self.kpts, spowk) return spow
get_cderi = eris.get_cderi get_cderi_exspace = eris.get_cderi_exspace get_eris_array = eris.get_eris_array get_eris_object = eris.get_eris_object
[docs] def build_screened_interactions(self, *args, **kwargs): """Build screened interactions, be they dynamic or static.""" have_static_screening = any([x.opts.screening is not None for x in self.fragments]) have_dynamic_screening = any([x.opts.bosonic_bath_options["bathtype"] is not None for x in self.fragments]) if have_static_screening and have_dynamic_screening: raise ValueError( "Cannot currently use both static screened coulomb interaction and bosonic baths at the same time." ) if have_static_screening: self.build_screened_eris(*args, **kwargs) if have_dynamic_screening: self.build_bosonic_bath(*args, **kwargs)
[docs] def build_bosonic_bath(self): if self.spinsym != "restricted": raise NotImplementedError("Bosonic baths are currently only compatible with a restricted formalism.") self.log.info("") self.log.info("BOSONIC BATH SETUP") self.log.info("==================") fragments = self.get_fragments(active=True, sym_parent=None, mpi_rank=mpi.rank) with log_time(self.log.timing, "Total time for bath and clusters: %s"): msg = "Generating required information for bosonic bath generation" self.log.info(msg) self.log.info(len(msg) * "-") # Generate list of all required target information. targets, ntarget = zip(*[x.make_bosonic_bath_target() for x in fragments]) target_rot = np.vstack([x for x in targets if x is not None]) rpa = ssRIRPA(self.mf) moments = rpa.kernel_moms(max_moment=0, target_rot=target_rot)[0][0] # Split this into individual fragment contributions. moments = np.vsplit(moments, np.cumsum(ntarget)[:-1]) for x, moment in zip(fragments, moments): msg = "Making bosonic bath for %s%s" % (x, (" on MPI process %d" % mpi.rank) if mpi else "") self.log.info(msg) self.log.info(len(msg) * "-") with self.log.indent(): x.make_bosonic_cluster(moment)
[docs] @log_method() @with_doc(build_screened_eris) def build_screened_eris(self, *args, **kwargs): self.log.info("") self.log.info("SCREENED INTERACTION SETUP") self.log.info("==========================") nmomscr = len([x.opts.screening for x in self.fragments if x.opts.screening == "mrpa"]) lov = None if self.opts.ext_rpa_correction: cumulant = self.opts.ext_rpa_correction == "cumulant" if nmomscr < self.nfrag: raise NotImplementedError( "External dRPA correction currently requires all fragments use mrpa screening." ) if self.opts.ext_rpa_correction not in ["erpa", "cumulant"]: raise ValueError("Unknown external rpa correction %s specified.") lov = self.get_cderi((self.mo_coeff_occ, self.mo_coeff_vir)) rpa = ssRIRPA(self.mf, log=self.log, lov=lov) if cumulant: lp = lov[0] lp = lp.reshape((lp.shape[0], -1)) l_ = l_2 = lp if lov[1] is not None: ln = lov[1].reshape((lov[1].shape[0], -1)) l_ = np.concatenate([lp, ln], axis=0) l_2 = np.concatenate([lp, -ln], axis=0) l_ = np.concatenate([l_, l_], axis=1) l_2 = np.concatenate([l_2, l_2], axis=1) m0 = rpa.kernel_moms(0, target_rot=l_)[0][0] # Deduct effective mean-field contribution and project the RHS and we're done. self.e_rpa = 0.5 * einsum("pq,pq->", m0 - l_, l_2) else: # Calculate total dRPA energy in N^4 time; this is cheaper than screening calculations. self.e_rpa, energy_error = rpa.kernel_energy(correction="linear") self.e_rpa = self.e_rpa / self.ncells self.log.info("Set total RPA correlation energy contribution as %s", energy_string(self.e_rpa)) if nmomscr > 0: self.log.info("") self.log.info("SCREENED INTERACTION SETUP") self.log.info("==========================") with log_time(self.log.timing, "Time for screened interation setup: %s"): build_screened_eris(self, *args, store_m0=self.opts.ext_rpa_correction, cderi_ov=lov, **kwargs)
# Symmetry between fragments # --------------------------
[docs] def create_symmetric_fragments(self, symmetry, fragments=None, symbol=None, mf_tol=None, check_mf=True): """Add rotationally or translationally symmetric fragments. Parameters ---------- mf_tol: float, optional Tolerance for the error of the mean-field density matrix between symmetry related fragments. If the largest absolute difference in the density-matrix is above this value, and exception will be raised. Default: self.opts.symmetry_mf_tol. Returns ------- fragments: list List of T-symmetry related fragments. These will have the attributes `sym_parent` and `sym_op` set. """ default_axes = {"x": (1, 0, 0), "y": (0, 1, 0), "z": (0, 0, 1)} def catch_default_axes(axis): if isinstance(axis, str): return default_axes[axis.lower()] return axis symtype = symmetry["type"] def to_bohr(point, unit): unit = unit.lower() point = np.asarray(point, dtype=float) if unit.startswith("ang"): return point / 0.529177210903 if unit == "latvec": # kcell = self.kcell if self.kcell is not None else self.mol return np.dot(point, (self.kcell or self.mol).lattice_vectors()) if unit.startswith("bohr"): return point raise ValueError("unit= %s" % unit) def shift_point_to_supercell(point): """Shift point in primitive cell to equivalent, scaled point in supercell.""" if self.kcell is None: # No PBC or no supercell return point ak = self.kcell.lattice_vectors() # primtive cell lattice vectors bk = np.linalg.inv(ak) a = self.mol.lattice_vectors() # supercell lattice vectors shift = (np.diag(a) / np.diag(ak) - 1) / 2 # Shift in internal coordinates, then transform back point = np.dot(np.dot(point, bk) + shift, ak) return point if symtype == "inversion": center = to_bohr(symmetry["center"], symmetry["unit"]) center = shift_point_to_supercell(center) symbol = symbol or "I" symlist = [1] elif symtype == "reflection": center = to_bohr(symmetry["center"], symmetry["unit"]) center = shift_point_to_supercell(center) axis = symmetry["axis"] axis = np.asarray(catch_default_axes(axis), dtype=float) axis = to_bohr(axis, symmetry["unit"]) symbol = symbol or "M" symlist = [1] elif symtype == "rotation": order = symmetry["order"] axis = symmetry["axis"] axis = np.asarray(catch_default_axes(axis), dtype=float) axis = to_bohr(axis, symmetry["unit"]) center = to_bohr(symmetry["center"], symmetry["unit"]) center = shift_point_to_supercell(center) symlist = range(1, order) symbol = symbol or "R" elif symtype == "translation": translation = np.asarray(symmetry["translation"]) symlist = list(itertools.product(range(translation[0]), range(translation[1]), range(translation[2])))[1:] symbol = symbol or "T" else: raise ValueError("Symmetry type= %s" % symtype) ovlp = self.get_ovlp() if check_mf: dm1 = self.mf.make_rdm1() if fragments is None: fragments = self.get_fragments() ftree = [[fx] for fx in fragments] for i, sym in enumerate(symlist): if symtype == "inversion": sym_op = SymmetryInversion(self.symmetry, center=center) elif symtype == "inversion": sym_op = SymmetryReflection(self.symmetry, axis=axis, center=center) elif symtype == "reflection": sym_op = SymmetryReflection(self.symmetry, axis=axis, center=center) elif symtype == "rotation": rotvec = 2 * np.pi * (sym / order) * axis / np.linalg.norm(axis) sym_op = SymmetryRotation(self.symmetry, rotvec, center=center) elif symtype == "translation": transvec = np.asarray(sym) / translation sym_op = SymmetryTranslation(self.symmetry, transvec) for flist in ftree: parent = flist[0] # Name for symmetry related fragment if symtype == "inversion": name = "%s_%s" % (parent.name, symbol) elif symtype == "reflection": name = "%s_%s" % (parent.name, symbol) elif symtype == "rotation": name = "%s_%s(%d)" % (parent.name, symbol, sym) elif symtype == "translation": name = "%s_%s(%d,%d,%d)" % (parent.name, symbol, *sym) # Translated coefficients c_frag_t = sym_op(parent.c_frag) c_env_t = None # Avoid expensive symmetry operation on environment orbitals # Check that translated fragment does not overlap with current fragment: fragovlp = parent._csc_dot(parent.c_frag, c_frag_t, ovlp=ovlp) if self.spinsym == "restricted": fragovlp = abs(fragovlp).max() elif self.spinsym == "unrestricted": fragovlp = max(abs(fragovlp[0]).max(), abs(fragovlp[1]).max()) if fragovlp > 1e-6: self.log.critical( "%s of fragment %s not orthogonal to original fragment (overlap= %.1e)!", sym_op, parent.name, fragovlp, ) raise RuntimeError("Overlapping fragment spaces (overlap= %.1e)" % fragovlp) # Add fragment frag_id = self.register.get_next_id() frag = self.Fragment( self, frag_id, name, c_frag_t, c_env_t, solver=parent.solver, sym_parent=parent, sym_op=sym_op, mpi_rank=parent.mpi_rank, flags=dataclasses.asdict(parent.flags), **parent.opts.asdict(), ) # Check symmetry # (only for the first rotation or primitive translations (1,0,0), (0,1,0), and (0,0,1) # to reduce number of sym_op(c_env) calls) if check_mf and (abs(np.asarray(sym)).sum() == 1): charge_err, spin_err = parent.get_symmetry_error(frag, dm1=dm1) if max(charge_err, spin_err) > (mf_tol or self.opts.symmetry_mf_tol): self.log.critical( "Mean-field DM1 not symmetric for %s of %s (errors: charge= %.1e, spin= %.1e)!", sym_op, parent.name, charge_err, spin_err, ) raise RuntimeError("MF not symmetric under %s" % sym_op) else: self.log.debugv( "Mean-field DM symmetry error for %s of %s: charge= %.1e, spin= %.1e", sym_op, parent.name, charge_err, spin_err, ) # Insert after parent fragment flist.append(frag) fragments_sym = [fx for flist in ftree for fx in flist[1:]] return fragments_sym
[docs] def create_invsym_fragments(self, center, fragments=None, unit="Ang", **kwargs): """Create inversion symmetric fragments. Parameters ---------- mf_tol: float, optional Tolerance for the error of the mean-field density matrix between symmetry related fragments. If the largest absolute difference in the density-matrix is above this value, and exception will be raised. Default: 1e-6. Returns ------- fragments: list List of inversion-symmetry related fragments. These will have have the attributes `sym_parent` and `sym_op` set. """ symmetry = dict(type="inversion", center=center, unit=unit) return self.create_symmetric_fragments(symmetry, fragments=fragments, **kwargs)
[docs] def create_mirrorsym_fragments(self, axis, center, fragments=None, unit="Ang", **kwargs): """Create mirror symmetric fragments. Parameters ---------- mf_tol: float, optional Tolerance for the error of the mean-field density matrix between symmetry related fragments. If the largest absolute difference in the density-matrix is above this value, and exception will be raised. Default: 1e-6. Returns ------- fragments: list List of mirror-symmetry related fragments. These will have have the attributes `sym_parent` and `sym_op` set. """ symmetry = dict(type="reflection", axis=axis, center=center, unit=unit) return self.create_symmetric_fragments(symmetry, fragments=fragments, **kwargs)
[docs] def create_rotsym_fragments(self, order, axis, center, fragments=None, unit="Ang", **kwargs): """Create rotationally symmetric fragments. Parameters ---------- mf_tol: float, optional Tolerance for the error of the mean-field density matrix between symmetry related fragments. If the largest absolute difference in the density-matrix is above this value, and exception will be raised. Default: 1e-6. Returns ------- fragments: list List of rotationally-symmetry related fragments. These will have have the attributes `sym_parent` and `sym_op` set. """ symmetry = dict(type="rotation", order=order, axis=axis, center=center, unit=unit) return self.create_symmetric_fragments(symmetry, fragments=fragments, **kwargs)
[docs] def create_transsym_fragments(self, translation, fragments=None, **kwargs): """Create translationally symmetric fragments. Parameters ---------- translation: array(3) of integers Each element represent the number of translation vector corresponding to the a0, a1, and a2 lattice vectors of the cell. mf_tol: float, optional Tolerance for the error of the mean-field density matrix between symmetry related fragments. If the largest absolute difference in the density-matrix is above this value, and exception will be raised. Default: 1e-6. Returns ------- fragments: list List of T-symmetry related fragments. These will have the attributes `sym_parent` and `sym_op` set. """ symmetry = dict(type="translation", translation=translation) return self.create_symmetric_fragments(symmetry, fragments=fragments, **kwargs)
[docs] def get_symmetry_parent_fragments(self): """Returns a list of all fragments, which are parents to symmetry related child fragments. Returns ------- parents: list A list of all parent fragments, ordered in the same way as they appear in `self.fragments`. """ parents = [] for f in self.fragments: if f.sym_parent is None: parents.append(f) return parents
[docs] def get_symmetry_child_fragments(self, include_parents=False): """Returns a list of all fragments, which are children to symmetry related parent fragments. Parameters ---------- include_parents: bool, optional If true, the parent fragment of each symmetry group is prepended to each symmetry sublist. Returns ------- children: list of lists A list with the length of the number of parent fragments in the system, each element being another list containing all the children fragments of the given parent fragment. Both the outer and inner lists are ordered in the same way that the fragments appear in `self.fragments`. """ parents = self.get_symmetry_parent_fragments() if include_parents: children = [[p] for p in parents] else: children = [[] for p in parents] parent_ids = [p.id for p in parents] for f in self.fragments: if f.sym_parent is None: continue pid = f.sym_parent.id assert pid in parent_ids idx = parent_ids.index(pid) children[idx].append(f) return children
[docs] def get_fragments(self, fragments=None, options=None, flags=None, **filters): """Return all fragments which obey the specified conditions. Arguments --------- **filters: List of returned fragments will be filtered according to specified keyword arguments. Returns ------- fragments: list List of fragments. Examples -------- Only returns fragments with mpi_rank 0, 1, or 2: >>> self.get_fragments(mpi_rank=[0,1,2]) Only returns fragments with no symmetry parent: >>> self.get_fragments(sym_parent=None) """ if fragments is None: fragments = self.fragments options = options or {} flags = flags or {} if not (filters or options or flags): return fragments def _values_atleast_1d(d): return {k: (v if callable(v) else np.atleast_1d(v)) for k, v in d.items()} filters = _values_atleast_1d(filters) options = _values_atleast_1d(options) flags = _values_atleast_1d(flags) def _skip(attr, filt): if callable(filt): return not filt(attr) return attr not in filt filtered_fragments = [] for frag in fragments: skip = False # Check filters: for key, filt in filters.items(): attr = getattr(frag, key) skip = _skip(attr, filt) if skip: break if skip: continue # Check options: for key, filt in options.items(): attr = getattr_recursive(frag.opts, key) skip = _skip(attr, filt) if skip: break # Check flags: for key, filt in flags.items(): attr = getattr_recursive(frag.flags, key) skip = _skip(attr, filt) if skip: break if skip: continue filtered_fragments.append(frag) return filtered_fragments
[docs] def get_fragment_overlap_norm(self, fragments=None, occupied=True, virtual=True, norm=2): """Get matrix of overlap norms between fragments.""" if fragments is None: fragments = self.get_fragments() if isinstance(fragments[0], self.Fragment): fragments = 2 * [fragments] if not (occupied or virtual): raise ValueError overlap = np.zeros((len(fragments[0]), len(fragments[1]))) ovlp = self.get_ovlp() nxy_occ = nxy_vir = np.inf for i, fx in enumerate(fragments[0]): if occupied: cxs_occ = spinalg.dot(spinalg.T(fx.cluster.c_occ), ovlp) if virtual: cxs_vir = spinalg.dot(spinalg.T(fx.cluster.c_vir), ovlp) for j, fy in enumerate(fragments[1]): if occupied: rxy_occ = spinalg.dot(cxs_occ, fy.cluster.c_occ) nxy_occ = np.amax(spinalg.norm(rxy_occ, ord=norm)) if virtual: rxy_vir = spinalg.dot(cxs_vir, fy.cluster.c_vir) nxy_vir = np.amax(spinalg.norm(rxy_vir, ord=norm)) overlap[i, j] = np.amin((nxy_occ, nxy_vir)) return overlap
def _absorb_fragments(self, tol=1e-10): """TODO""" for fx in self.get_fragments(active=True): for fy in self.get_fragments(active=True): if fx.id == fy.id: continue if not (fx.active and fy.active): continue def svd(cx, cy): rxy = np.dot(cx.T, cy) u, s, v = np.linalg.svd(rxy, full_matrices=False) if s.min() >= (1 - tol): nx = cx.shape[-1] ny = cy.shape[-1] swap = False if (nx >= ny) else True return swap return None cx_occ = fx.get_overlap("mo[occ]|cluster[occ]") cy_occ = fy.get_overlap("mo[occ]|cluster[occ]") swocc = svd(cx_occ, cy_occ) if swocc is None: continue cx_vir = fx.get_overlap("mo[vir]|cluster[vir]") cy_vir = fy.get_overlap("mo[vir]|cluster[vir]") swvir = svd(cx_vir, cy_vir) if swocc != swvir: continue # Absorb smaller if swocc: fx, fy = fy, fx c_frag = hstack(fx.c_frag, fy.c_frag) fx.c_frag = c_frag name = "/".join((fx.name, fy.name)) fy.active = False self.log.info("Subspace found: adding %s to %s (new name= %s)!", fy, fx, name) # Update fx fx.name = name fx.c_env = None fx._dmet_bath = None fx._occ_bath_factory = None fx._vir_bath_factory = None # Results # -------
[docs] def communicate_clusters(self): """Communicate cluster orbitals between MPI ranks.""" if not mpi: return with log_time(self.log.timing, "Time to communicate clusters: %s"): for x in self.get_fragments(sym_parent=None): source = x.mpi_rank if mpi.rank == source: x.cluster.orig_mf = None cluster = mpi.world.bcast(x.cluster, root=source) if mpi.rank != source: x.cluster = cluster x.cluster.orig_mf = self.mf
[docs] @log_method() @with_doc(make_rdm1_demo_rhf) def make_rdm1_demo(self, *args, **kwargs): return make_rdm1_demo_rhf(self, *args, **kwargs)
[docs] @log_method() @with_doc(make_rdm2_demo_rhf) def make_rdm2_demo(self, *args, **kwargs): return make_rdm2_demo_rhf(self, *args, **kwargs)
[docs] def get_dmet_elec_energy(self, part_cumulant=True, approx_cumulant=True, mpi_target=None): """Calculate electronic DMET energy via democratically partitioned density-matrices. Parameters ---------- part_cumulant: bool, optional If True, the 2-DM cumulant will be partitioned to calculate the energy. If False, the full 2-DM will be partitioned, as it is done in most of the DMET literature. True is recommended, unless checking for agreement with literature results. Default: True. approx_cumulant: bool, optional If True, the approximate cumulant, containing (delta 1-DM)-squared terms, is partitioned, instead of the true cumulant, if `part_cumulant=True`. Default: True. mpi_target: int or None, optional If set to an integer, the result will only be available at the specified MPI rank. If set to None, an MPI allreduce will be performed and the result will be available at all MPI ranks. Default: None. Returns ------- e_dmet: float Electronic DMET energy. """ e_dmet = 0.0 for x in self.get_fragments(active=True, mpi_rank=mpi.rank, sym_parent=None): wx = x.symmetry_factor e_dmet += wx * x.get_fragment_dmet_energy(part_cumulant=part_cumulant, approx_cumulant=approx_cumulant) if mpi: e_dmet = mpi.nreduce(e_dmet, target=mpi_target, logfunc=self.log.timingv) if part_cumulant: dm1 = self.make_rdm1_demo(ao_basis=True) if not approx_cumulant: vhf = self.get_veff_for_energy(dm1=dm1, with_exxdiv=False) elif int(approx_cumulant) == 1: dm1 = 2 * np.asarray(dm1) - self.mf.make_rdm1() vhf = self.get_veff_for_energy(with_exxdiv=False) else: raise ValueError e_dmet += np.sum(np.asarray(vhf) * dm1) / 2 self.log.debugv("E_elec(DMET)= %s", energy_string(e_dmet)) return e_dmet / self.ncells
[docs] def get_dmet_energy(self, part_cumulant=True, approx_cumulant=True, with_nuc=True, with_exxdiv=True): """Calculate DMET energy via democratically partitioned density-matrices. Parameters ---------- part_cumulant: bool, optional If True, the 2-DM cumulant will be partitioned to calculate the energy. If False, the full 2-DM will be partitioned, as it is done in most of the DMET literature. True is recommended, unless checking for agreement with literature results. Default: True. approx_cumulant: bool, optional If True, the approximate cumulant, containing (delta 1-DM)-squared terms, is partitioned, instead of the true cumulant, if `part_cumulant=True`. Default: True. with_nuc: bool, optional Include nuclear-repulsion energy. Default: True. with_exxdiv: bool, optional Include divergent exact-exchange correction. Default: True. Returns ------- e_dmet: float DMET energy. """ e_dmet = self.get_dmet_elec_energy(part_cumulant=part_cumulant, approx_cumulant=approx_cumulant) if with_nuc: e_dmet += self.e_nuc if with_exxdiv and self.has_exxdiv: e_dmet += self.get_exxdiv()[0] return e_dmet
get_corrfunc_mf = log_method()(get_corrfunc_mf) get_corrfunc = log_method()(get_corrfunc) # Utility # ------- def _check_orthonormal(self, *mo_coeff, mo_name="", crit_tol=1e-2, err_tol=1e-7): """Check orthonormality of mo_coeff.""" mo_coeff = hstack(*mo_coeff) err = dot(mo_coeff.T, self.get_ovlp(), mo_coeff) - np.eye(mo_coeff.shape[-1]) l2 = np.linalg.norm(err) linf = abs(err).max() if mo_name: mo_name = " of %ss" % mo_name if max(l2, linf) > crit_tol: self.log.critical("Orthonormality error%s: L(2)= %.2e L(inf)= %.2e !", mo_name, l2, linf) raise OrthonormalityError("Orbitals not orhonormal!") elif max(l2, linf) > err_tol: self.log.error("Orthonormality error%s: L(2)= %.2e L(inf)= %.2e !", mo_name, l2, linf) else: self.log.debugv("Orthonormality error%s: L(2)= %.2e L(inf)= %.2e", mo_name, l2, linf) return l2, linf
[docs] def get_mean_cluster_size(self): return np.mean([x.cluster.norb_active for x in self.fragments])
[docs] def get_average_cluster_size(self, average="mean"): if average == "mean": return self.get_mean_cluster_size() raise ValueError
[docs] def get_min_cluster_size(self): return np.min([x.cluster.norb_active for x in self.fragments])
[docs] def get_max_cluster_size(self): return np.max([x.cluster.norb_active for x in self.fragments])
# --- Population analysis # ----------------------- def _get_atom_projectors(self, atoms=None, projection="sao", orbital_filter=None): if atoms is None: atoms2 = list(range(self.mol.natm)) # For supercell systems, we do not want all supercell-atom pairs, # but only primitive-cell -- supercell pairs: atoms1 = atoms2 if (self.kcell is None) else list(range(self.kcell.natm)) elif isinstance(atoms[0], (int, np.integer)): atoms1 = atoms2 = atoms else: atoms1, atoms2 = atoms # Get atomic projectors: projection = projection.lower() if projection == "sao": frag = SAO_Fragmentation(self) elif projection.replace("+", "").replace("/", "") == "iaopao": frag = IAOPAO_Fragmentation(self) elif projection == "iao": frag = IAO_Fragmentation(self) self.log.warning("IAO projection is not recommended for population analysis! Use IAO+PAO instead.") else: raise ValueError("Invalid projection: %s" % projection) frag.kernel() ovlp = self.get_ovlp() projectors = {} cs = spinalg.dot(spinalg.transpose(self.mo_coeff), ovlp) for atom in sorted(set(atoms1).union(atoms2)): name, indices = frag.get_atomic_fragment_indices(atom, orbital_filter=orbital_filter) c_atom = frag.get_frag_coeff(indices) if isinstance(c_atom, tuple): no_orbs = c_atom[0].shape[1] == 0 and c_atom[1].shape[1] == 0 else: no_orbs = c_atom.shape[1] == 0 if no_orbs and orbital_filter is not None: self.log.error("No orbitals found for atom %d when filtered!" % atom) raise ValueError("No orbitals found for atom %d when filtered" % atom) r = spinalg.dot(cs, c_atom) projectors[atom] = spinalg.dot(r, spinalg.transpose(r)) return atoms1, atoms2, projectors
[docs] def get_lo_coeff(self, local_orbitals="lowdin", minao="auto"): if local_orbitals.lower() == "lowdin": # Avoid pre_orth_ao step! # self.c_lo = c_lo = pyscf.lo.orth_ao(self.mol, 'lowdin') # self.c_lo = c_lo = pyscf.lo.orth_ao(self.mol, 'meta-lowdin', pre_orth_ao=None) return self.get_ovlp_power(power=-0.5) elif local_orbitals.lower() == "iao+pao": return IAOPAO_Fragmentation(self, minao=minao).get_coeff() raise ValueError("Unknown local orbitals: %r" % local_orbitals)
[docs] def pop_analysis( self, dm1, mo_coeff=None, local_orbitals="lowdin", minao="auto", write=True, filename=None, filemode="a", orbital_resolved=False, mpi_rank=0, ): """ Parameters ---------- dm1 : (N, N) array If `mo_coeff` is None, AO representation is assumed. local_orbitals : {'lowdin', 'mulliken', 'iao+pao'} or array Kind of population analysis. Default: 'lowdin'. Returns ------- pop : (N) array Population of atomic orbitals. """ if mo_coeff is not None: dm1 = einsum("ai,ij,bj->ab", mo_coeff, dm1, mo_coeff) ovlp = self.get_ovlp() if isinstance(local_orbitals, str): lo = local_orbitals.lower() if lo == "mulliken": c_lo = None else: c_lo = self.get_lo_coeff(lo, minao=minao) else: c_lo = local_orbitals if c_lo is None: pop = einsum("ab,ba->a", dm1, ovlp) else: cs = np.dot(c_lo.T, ovlp) pop = einsum("ia,ab,ib->i", cs, dm1, cs) if write and (mpi.rank == mpi_rank): self.write_population(pop, filename=filename, filemode=filemode, orbital_resolved=orbital_resolved) return pop
[docs] def get_atomic_charges(self, pop): charges = np.zeros(self.mol.natm) spins = np.zeros(self.mol.natm) if len(pop) != self.mol.nao: raise ValueError("n(AO)= %d n(Pop)= %d" % (self.mol.nao, len(pop))) for i, label in enumerate(self.mol.ao_labels(None)): charges[label[0]] -= pop[i] charges += self.mol.atom_charges() return charges, spins
[docs] def write_population(self, pop, filename=None, filemode="a", orbital_resolved=False): charges, spins = self.get_atomic_charges(pop) if orbital_resolved: aoslices = self.mol.aoslice_by_atom()[:, 2:] aolabels = self.mol.ao_labels() if filename is None: write = lambda *args: self.log.info(*args) write("Population analysis") write("-------------------") else: dirname = os.path.dirname(filename) if dirname: os.makedirs(dirname, exist_ok=True) f = open(filename, filemode) write = lambda fmt, *args: f.write((fmt + "\n") % args) tstamp = datetime.now() self.log.info('Writing population analysis to file "%s". Time-stamp: %s', filename, tstamp) write("# Time-stamp: %s", tstamp) write("# Population analysis") write("# -------------------") for atom, charge in enumerate(charges): write("%3d %-7s q= % 11.8f s= % 11.8f", atom, self.mol.atom_symbol(atom) + ":", charge, spins[atom]) if orbital_resolved: aos = aoslices[atom] for ao in range(aos[0], aos[1]): label = aolabels[ao] if np.ndim(pop[0]) == 1: write(" %4d %-16s= % 11.8f % 11.8f" % (ao, label, pop[0][ao], pop[1][ao])) else: write(" %4d %-16s= % 11.8f" % (ao, label, pop[ao])) if filename is not None: f.close()
def _check_fragment_nelectron(self, fragments=None, nelec=None): if fragments is None: fragments = self.get_fragments(flags=dict(is_envelop=True)) if nelec is None: nelec = self.mol.nelectron nelec_frags = sum([f.sym_factor * f.nelectron for f in fragments]) self.log.info("Number of electrons over %d fragments= %.8f target= %.8f", len(fragments), nelec_frags, nelec) if abs(nelec_frags - nelec) > 1e-6: self.log.warning("Number of mean-field electrons in fragments not equal to the target number of electrons.") return nelec_frags # --- Fragmentation methods
[docs] def sao_fragmentation(self, **kwargs): """Initialize the quantum embedding method for the use of SAO (Lowdin-AO) fragments.""" return SAO_Fragmentation(self, **kwargs)
[docs] def site_fragmentation(self, **kwargs): """Initialize the quantum embedding method for the use of site fragments.""" return Site_Fragmentation(self, **kwargs)
[docs] def iao_fragmentation(self, minao="auto", **kwargs): """Initialize the quantum embedding method for the use of IAO fragments. Parameters ---------- minao: str, optional IAO reference basis set. Default: 'auto' """ return IAO_Fragmentation(self, minao=minao, **kwargs)
[docs] def iaopao_fragmentation(self, minao="auto", **kwargs): """Initialize the quantum embedding method for the use of IAO+PAO fragments. Parameters ---------- minao: str, optional IAO reference basis set. Default: 'auto' """ return IAOPAO_Fragmentation(self, minao=minao, **kwargs)
[docs] def cas_fragmentation(self, **kwargs): """Initialize the quantum embedding method for the use of site fragments.""" return CAS_Fragmentation(self, **kwargs)
def _check_fragmentation(self, complete_occupied=True, complete_virtual=True, tol=1e-7): """Check if union of fragment spaces is orthonormal and complete.""" if self.spinsym == "restricted": nspin = 1 tspin = lambda x, s: x nelec = self.mol.nelectron elif self.spinsym == "unrestricted": nspin = 2 tspin = lambda x, s: x[s] nelec = self.mol.nelec ovlp = self.get_ovlp() dm1 = self.mf.make_rdm1() for s in range(nspin): nmo_s = tspin(self.nmo, s) nelec_s = tspin(nelec, s) fragments = self.get_fragments(contributes=True, flags=dict(is_secfrag=False)) if not fragments: return False c_frags = np.hstack([tspin(x.c_frag, s) for x in fragments]) nfrags = c_frags.shape[-1] csc = dot(c_frags.T, ovlp, c_frags) if not np.allclose(csc, np.eye(nfrags), rtol=0, atol=tol): self.log.debug("Non-orthogonal error= %.3e", abs(csc - np.eye(nfrags)).max()) return False if complete_occupied and complete_virtual: if nfrags != nmo_s: return False elif complete_occupied or complete_virtual: cs = np.dot(c_frags.T, ovlp) ne = einsum("ia,ab,ib->", cs, tspin(dm1, s), cs) if complete_occupied and (abs(ne - nelec_s) > tol): return False if complete_virtual and (abs((nfrags - ne) - (nmo_s - nelec_s)) > tol): return False return True
[docs] def has_orthonormal_fragmentation(self, **kwargs): """Check if union of fragment spaces is orthonormal.""" return self._check_fragmentation(complete_occupied=False, complete_virtual=False, **kwargs)
[docs] def has_complete_fragmentation(self, **kwargs): """Check if union of fragment spaces is orthonormal and complete.""" return self._check_fragmentation(complete_occupied=True, complete_virtual=True, **kwargs)
[docs] def has_complete_occupied_fragmentation(self, **kwargs): """Check if union of fragment spaces is orthonormal and complete in the occupied space.""" return self._check_fragmentation(complete_occupied=True, complete_virtual=False, **kwargs)
[docs] def has_complete_virtual_fragmentation(self, **kwargs): """Check if union of fragment spaces is orthonormal and complete in the virtual space.""" return self._check_fragmentation(complete_occupied=False, complete_virtual=True, **kwargs)
[docs] def require_complete_fragmentation(self, message=None, incl_virtual=True, **kwargs): if incl_virtual: complete = self.has_complete_fragmentation(**kwargs) else: complete = self.has_complete_occupied_fragmentation(**kwargs) if complete: return if message: message = " %s" % message else: message = "" self.log.error("Fragmentation is not orthogonal and complete.%s", message)
# --- Reset def _reset_fragments(self, *args, **kwargs): for fx in self.fragments: fx.reset(*args, **kwargs) def _reset(self): self.e_corr = None self.converged = False self.e_rpa = None
[docs] def reset(self, *args, **kwargs): self._reset() self._reset_fragments(*args, **kwargs)
# --- Mean-field updates
[docs] def update_mf(self, mo_coeff, mo_energy=None, veff=None): """Update underlying mean-field object.""" # Chech orthonormal MOs if not np.allclose(dot(mo_coeff.T, self.get_ovlp(), mo_coeff) - np.eye(mo_coeff.shape[-1]), 0): raise ValueError("MO coefficients not orthonormal!") self.mf.mo_coeff = mo_coeff dm = self.mf.make_rdm1(mo_coeff=mo_coeff) if veff is None: veff = self.mf.get_veff(dm=dm) self.set_veff(veff) if mo_energy is None: # Use diagonal of Fock matrix as MO energies mo_energy = einsum("ai,ab,bi->i", mo_coeff, self.get_fock(), mo_coeff) self.mf.mo_energy = mo_energy self.mf.e_tot = self.mf.energy_tot(dm=dm, h1e=self.get_hcore(), vhf=veff)
[docs] def check_fragment_symmetry(self, dm1, symtol=1e-6): """Check that the mean-field obeys the symmetry between fragments.""" frags = self.get_symmetry_child_fragments(include_parents=True) for group in frags: parent, children = group[0], group[1:] for child in children: charge_err, spin_err = parent.get_symmetry_error(child, dm1=dm1) if max(charge_err, spin_err) > symtol: raise SymmetryError( "%s and %s not symmetric! Errors: charge= %.2e spin= %.2e" % (parent.name, child.name, charge_err, spin_err) ) else: self.log.debugv( "Symmetry between %s and %s: Errors: charge= %.2e spin= %.2e", parent.name, child.name, charge_err, spin_err, )
# --- Decorators # These replace the qemb.kernel method!
[docs] def optimize_chempot(self, cpt_init=0.0, dm1func=None, dm1kwds=None, robust=False): if dm1func is None: dm1func = self.make_rdm1_demo if dm1kwds is None: dm1kwds = {} kernel_orig = self.kernel iters = [] result = None def func(cpt, *args, **kwargs): nonlocal iters, result self.opts.global_frag_chempot = cpt result = kernel_orig(*args, **kwargs) dm1 = dm1func(**dm1kwds) if self.is_rhf: ne = np.trace(dm1) else: ne = np.trace(dm1[0]) + np.trace(dm1[1]) err = ne - self.mol.nelectron iters.append((cpt, err, self.converged, self.e_tot)) return err bisect = ChempotBisection(func, cpt_init=cpt_init, robust=robust, log=self.log) def kernel(self, *args, **kwargs): nonlocal iters, result cpt = bisect.kernel(*args, **kwargs) # Print info: self.log.info("Chemical potential optimization") self.log.info("-------------------------------") self.log.info(" Iteration Chemical potential N(elec) error Converged Total Energy") for i, (cpt, err, conv, etot) in enumerate(iters): self.log.info( " %9d %19s %+14.8f %9r %19s", i + 1, energy_string(cpt), err, conv, energy_string(etot) ) if not bisect.converged: self.log.error("Chemical potential not found!") return result self.kernel = kernel.__get__(self)
[docs] def pdmet_scmf(self, *args, **kwargs): """Decorator for p-DMET.""" self.with_scmf = PDMET(self, *args, **kwargs) self.kernel = self.with_scmf.kernel
[docs] def brueckner_scmf(self, *args, **kwargs): """Decorator for Brueckner-DMET.""" self.with_scmf = Brueckner(self, *args, **kwargs) self.kernel = self.with_scmf.kernel
[docs] def qpewdmet_scmf(self, *args, **kwargs): """Decorator for QP-EWDMET.""" try: from vayesta.core.scmf import QPEWDMET except ImportError: self.log.error("QP-EWDMET requires Dyson installed") return self.with_scmf = QPEWDMET(self, *args, **kwargs) self.kernel = self.with_scmf.kernel
[docs] def check_solver(self, solver): is_uhf = np.ndim(self.mo_coeff[1]) == 2 if self.opts.screening: is_eb = "crpa_full" in self.opts.screening else: is_eb = False check_solver_config(is_uhf, is_eb, solver, self.log)