Source code for psi4.driver.task_base

#
# @BEGIN LICENSE
#
# Psi4: an open-source quantum chemistry software package
#
# Copyright (c) 2007-2023 The Psi4 Developers.
#
# The copyrights for code used from other parties are included in
# the corresponding files.
#
# This file is part of Psi4.
#
# Psi4 is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3.
#
# Psi4 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with Psi4; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# @END LICENSE
#

__all__ = [
    "AtomicComputer",
    "BaseComputer",
    "EnergyGradientHessianWfnReturn",
]

import abc
import copy
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

try:
    from pydantic.v1 import Field, validator
except ImportError:
    from pydantic import Field, validator

import qcelemental as qcel
from qcelemental.models import AtomicInput, AtomicResult, DriverEnum
from qcelemental.models.results import AtomicResultProtocols
qcel.models.molecule.GEOMETRY_NOISE = 13  # need more precision in geometries for high-res findif
import qcengine as qcng

from psi4 import core

from . import p4util

if TYPE_CHECKING:
    import qcportal

logger = logging.getLogger(__name__)

EnergyGradientHessianWfnReturn = Union[float, core.Matrix, Tuple[Union[float, core.Matrix], core.Wavefunction]]


[docs] class BaseComputer(qcel.models.ProtoModel): """Base class for "computers" that plan, run, and process QC tasks."""
[docs] @abc.abstractmethod def compute(self): pass
[docs] @abc.abstractmethod def plan(self): pass
class Config(qcel.models.ProtoModel.Config): extra = "allow" allow_mutation = True
[docs] class AtomicComputer(BaseComputer): """Computer for analytic single-geometry computations.""" molecule: Any = Field(..., description="The molecule to use in the computation.") basis: str = Field(..., description="The quantum chemistry basis set to evaluate (e.g., 6-31g, cc-pVDZ, ...).") method: str = Field(..., description="The quantum chemistry method to evaluate (e.g., B3LYP, MP2, ...).") driver: DriverEnum = Field(..., description="The resulting type of computation: energy, gradient, hessian, properties." "Note for finite difference that this should be the target driver, not the means driver.") keywords: Dict[str, Any] = Field(default_factory=dict, description="The keywords to use in the computation.") protocols: Optional[Union[AtomicResultProtocols, Dict[str, Any]]] = Field({"stdout": True}, description="Output modifications.") tag: str = Field("*", description="The tags to pass along to compute managers.") priority: str = Field(1, description="The priority of a Task; higher priority will be pulled first. {high:2, normal:1, low:0}") owner_group: Optional[str] = Field(None, description="group in the chown sense.") computed: bool = Field(False, description="Whether quantum chemistry has been run on this task.") result: Any = Field(default_factory=dict, description=":py:class:`~qcelemental.models.AtomicResult` return.") result_id: Optional[str] = Field(None, description="The optional ID for the computation.") class Config(qcel.models.ProtoModel.Config): pass
[docs] @validator("basis") def set_basis(cls, basis): return basis.lower()
[docs] @validator("method") def set_method(cls, method): return method.lower()
[docs] @validator("keywords") def set_keywords(cls, keywords): return copy.deepcopy(keywords)
[docs] def plan(self) -> AtomicInput: """Form QCSchema input from member data.""" atomic_model = AtomicInput(**{ "molecule": self.molecule.to_schema(dtype=2), "driver": self.driver, "model": { "method": self.method, "basis": self.basis }, "keywords": self.keywords, "protocols": self.protocols, "extras": { "psiapi": True, "wfn_qcvars_only": True, }, }) return atomic_model
[docs] def compute(self, client: Optional["qcportal.client.FractalClient"] = None): """Run quantum chemistry.""" from psi4.driver import pp if self.computed: return if client: self.computed = True try: # QCFractal v0.15.8 from qcportal.models import KeywordSet, Molecule qca_next_branch = False except ImportError: # QCFractal `next` from qcelemental.models import Molecule qca_next_branch = True # Build the molecule mol = Molecule(**self.molecule.to_schema(dtype=2)) if not qca_next_branch: # QCFractal v0.15.8 # Build the keywords keyword_id = client.add_keywords([KeywordSet(values=self.keywords)])[0] r = client.add_compute("psi4", self.method, self.basis, self.driver, keyword_id, [mol]) self.result_id = r.ids[0] # NOTE: The following will re-run errored jobs by default if self.result_id in r.existing: ret = client.query_tasks(base_result=self.result_id) if ret: if ret[0].status == "ERROR": client.modify_tasks("restart", base_result=self.result_id) logger.info("Resubmitting Errored Job {}".format(self.result_id)) elif ret[0].status == "COMPLETE": logger.debug("Job already completed {}".format(self.result_id)) else: logger.debug("Job already completed {}".format(self.result_id)) else: logger.debug("Submitting AtomicResult {}".format(self.result_id)) else: # QCFractal `next` meta, ids = client.add_singlepoints( molecules=mol, program="psi4", driver=self.driver, method=self.method, basis=self.basis, keywords=self.keywords, protocols=self.protocols, tag=self.tag, priority=self.priority, owner_group=self.owner_group, ) self.result_id = ids[0] # NOTE: The following will re-run errored jobs by default if meta.existing_idx: rec = client.get_singlepoints(self.result_id) if rec.status == "error": client.reset_records(self.result_id) logger.info("Resubmitting Errored Job {}".format(self.result_id)) elif rec.status == "complete": logger.debug("Job already completed {}".format(self.result_id)) else: logger.debug("Submitting AtomicResult {}".format(self.result_id)) return logger.info(f'<<< JSON launch ... {self.molecule.schoenflies_symbol()} {self.molecule.nuclear_repulsion_energy()}') gof = core.get_output_file() # EITHER ... # from psi4.driver import schema_wrapper # self.result = schema_wrapper.run_qcschema(self.plan()) # ... OR ... self.result = qcng.compute( self.plan(), "psi4", raise_error=True, # local_options below suitable for serial mode where each job takes all the resources of the parent Psi4 job. # distributed runs through QCFractal will likely need a different setup. task_config={ # B -> GiB "memory": core.get_memory() / (2 ** 30), "ncores": core.get_num_threads(), }, ) # ... END #pp.pprint(self.result.dict()) #print("... JSON returns >>>") core.set_output_file(gof, True) core.reopen_outfile() logger.debug(pp.pformat(self.result.dict())) if stdout := self.result.dict()["stdout"]: core.print_out(_drink_filter(stdout)) self.computed = True
[docs] def get_results(self, client: Optional["qcportal.FractalClient"] = None) -> AtomicResult: """Return results as Atomic-flavored QCSchema.""" if self.result: return self.result if client: try: # QCFractal/QCPortal v0.15.8 result = client.query_results(id=self.result_id) qca_next_branch = False except AttributeError: # QCFractal/QCPortal `next` record = client.get_singlepoints(record_ids=self.result_id) qca_next_branch = True logger.debug(f"Querying AtomicResult {self.result_id}") if not qca_next_branch: # QCFractal v0.15.8 if len(result) == 0: return self.result self.result = result[0] else: # QCFractal `next` if record.status != "complete": return self.result self.result = _singlepointrecord_to_atomicresult(record) return self.result
def _singlepointrecord_to_atomicresult(spr: "qcportal.singlepoint.SinglepointRecord") -> AtomicResult: atres = spr.to_qcschema_result() # QCFractal `next` database stores return_result, properties, and extras["qcvars"] merged # together and with lowercase keys. `to_qcschema_result` partitions properties back out, # but we need to restore qcvars keys, types, and dimensions. qcvars = atres.extras.pop("extra_properties") qcvars.pop("return_result") qcvars = {k.upper(): p4util.plump_qcvar(k, v) for k, v in qcvars.items()} atres.extras["qcvars"] = qcvars return atres def _drink_filter(stdout: str) -> str: """Don't mess up the widespread ``grep beer`` test of Psi4 doneness by printing multiple drinks per outfile.""" stdout = stdout.replace("\n*** Psi4 exiting successfully. Buy a developer a beer!", "") stdout = stdout.replace("\n*** Psi4 encountered an error. Buy a developer more coffee!", "") return stdout