Source code for ase2sprkkr.outputs.readers.scf

"""SCF (selfconsisten cycle) reader and result."""

import re
import pyparsing as pp
import unyt

from ..output_definitions import (
    OutputSectionDefinition as Section,
    OutputValueDefinition as V,
    OutputValueEqualDefinition as VE,
    OutputNonameValueDefinition as VN,
)
from ...common.grammar_types import Table, integer, string, Real, RealWithUnits, String, Sequence, Array
from ..task_result import TaskResult, ResultValue
from ...common.process_output_reader import readline, readline_until
from ..sprkkr_output_reader import SprKkrOutputParser
from ...common.decorators import cached_property
from ...sprkkr.calculator import SPRKKR
from ...common.formats import fortran_format
from ...common.grammar import replace_whitechars
from ..task_result import KkrOutputReader
from ...potentials.potentials import Potential


[docs] class IterationValue(ResultValue):
[docs] def __init__(self, name, value, plot): super().__init__(name, value) self._plot = plot
[docs] def plot(self): self._plot()
[docs] def actions(self): return ["plot"]
[docs] class DataValue(ResultValue):
[docs] def value_label(self): return "<data...>"
[docs] def actions(self): return ["data"]
[docs] def data(self): return self._value
[docs] def plot_iterations( iterations, what=["error", ("energy", "ETOT"), ("energy", "EMIN")], filename=None, logscale=None, **kwargs ): import matplotlib.pyplot as pyplot if logscale is None: logscale = {"error"} if not isinstance(what, list): what = [what] fig, axs = pyplot.subplots(len(what), sharex=True, squeeze=False) iterations_ids = iterations.values_of("iteration") for name, ax in zip(what, axs.ravel()): data = iterations.values_of(name) if name in logscale: ax.set_yscale("log") kwargs.update({"ls": None, "mfc": None, "mew": 2, "ms": 6}) ax.plot(iterations_ids, data, "+", **kwargs) ax.set_ylabel(iterations[0][name]._definition.name) axs[-1, 0].set_xlabel("Iteration") fig.tight_layout() if filename: fig.savefig(filename) else: pyplot.show()
[docs] class RealOrStars(Real): """A real value, where ``****`` means ``NaN``""" _grammar = Real._grammar | replace_whitechars(pp.Word("*")).set_parse_action(lambda x: float("NaN"))
[docs] class ScfResult(TaskResult): """Objects of this class holds the results of a self-consistent cycle and its iterations. It also allows to plot the convergence of values during iterations. """
[docs] @cached_property def calculator(self): """The calculator that has the new (output) potential assigned - i.e. that can be used for further calculations with the (hopefully) converged wavefunctions etc.""" if self._calculator: return self._calculator.copy_with_potential(self.potential_filename) else: SPRKKR(potential=self.potential_filename)
@property def potential_filename(self): fname = super().potential_filename + "_new" return fname @property def start_potential_filename(self): return super().potential_filename @property def start_potential(self): return Potential.from_file(self.potential_filename) @property def energy(self): """Total energy of the last iteration""" try: return self.last_iteration.energy.ETOT() except Exception as e: raise AttributeError("No iteration has been finished") from e @property def converged(self): """The calculation coverged or not?""" return self.last_iteration["converged"]()
[docs] def iteration_values(self, name): """Return the array of values of given name from the iterations (i.e. "the column of the table of the values") E.g. result.iteration_values(('energy', 'ETOT')) return list of floats - the total energies computed in each iteration. Parameters ---------- name: str Name of the parameter Return ------ values: list The values. """ if name not in self.iterations[0]: raise KeyError(f"No such iteration value: {name}") return self.iterations.values_of(name)
@property def last_iteration(self): """Return the data of the last iteration""" if not self.iterations: raise AttributeError("No iteration has been finished") return self.iterations[-1] @property def energies(self): return self.last_iteration.energy
[docs] @cached_property def output_values(self): return { "converged": ResultValue("Converged", str(self.last_iteration.converged())), "iterations": ResultValue("Number of iterations", len(self.iterations)), "fermi energy": IterationValue( "Fermi energy", self.last_iteration.energy.EF(), lambda: self.plot(("energy", "EF")) ), "error": IterationValue("Error", self.last_iteration.error(), lambda: self.plot("error")), "data": DataValue("Data", self), }
[docs] def plot(self, what=["error", ("energy", "ETOT"), ("energy", "EMIN")], filename=None, logscale=None, **kwargs): """Plot the development of the given value(s) during iterations. Parameters ---------- what: list[str] Names to be plotted filename: str or None If filename is given, the plot is rendered to the file, it is show on the screen otherwise. logscale: collections.abc.Set[str] Which values should be rendered in logscale **kwargs: dict All other arguments are passed to the matplotlib Axes.plot function """ return plot_iterations(self.iterations, what, filename, logscale, **kwargs)
""" This definition parses the part of the output file that contains the datas about atoms and computed data associated with them. """ atomic_types_definition = Section( "atoms", [ VN("IECURR", integer), VE("E", RealOrStars()), VN("L", float), VE("IT", integer), VN("symbol", string), VN( "orbitals", Table( { "l": string, "DOS": float, "NOS": float, "P_spin": float, "m_spin": float, "P_orb": float, "m_orb": float, "B_val": float, "B_val_desc": String(default_value=""), "B_core": Real(default_value=float("NaN"), nan=r"\*+"), }, free_header=True, default_values=True, ), ), V("E_band", RealWithUnits(units={"[Ry]": unyt.Ry}), is_required=False), V("dipole moment", Sequence(int, Array(float, length=3)), is_required=False), ], )
[docs] class PlottableValue(V):
[docs] def enrich(self, option): def plot(options, **kwargs): c = option._container._container plot_iterations(option._container._container, self.get_path[len(c.get_path) :], **kwargs) option.plot = lambda **kwargs: plot(option, **kwargs) super().enrich(option)
PV = PlottableValue """ This definition is not used for parsing, but just for the data returned from the reader. """ scf_section = Section( "iteration", [ V("system_name", str), V("iteration", int, info="Number of the iteration."), PV("error", float), V("b_error", float, is_required=False, info="RMS error of the exchange-correlation B-field at this iteration."), V("duration", float, is_required=False, info="Execution time of this iteration in seconds."), V("converged", bool, info="True, if the SCF cycle converged this iteration."), Section("moment", [PV("spin", float), PV("orbital", float)]), Section( "energy", [ PV(("EF", "fermi"), float, info="Fermi energy"), PV(("ETOT", "total"), float, info="Total energy"), PV(("EMIN", "band_states_min"), float, info="Bottom of energy contour for band states"), PV(("ESCBOT", "semi_core_min"), float, info="Lower limit for semi-core states", is_required=False), PV(("ECTOP", "core_max"), float, info="Upper limit for core states", is_required=False), ], ), Section("atomic_types", atomic_types_definition.members(), is_repeated=True), ], is_repeated=True, ) del PV
[docs] class ScfOutputParser(SprKkrOutputParser): """ This class reads and parses the output of the SCF task of the SPR-KKR. """
[docs] def set_print_output(self, print_output): self.print_info = print_output == "info" self.print_output = print_output and not self.print_info
[docs] async def read_output(self, stdout, result): await self.read_commons(stdout, result) result.files["converged"] = result.files["potential"] + "_new" iterations = scf_section.create_object() try: first = True _err_or_time = re.compile(rb"execution time for last iteration| ERR.*EF") while True: out = {} line = await readline_until(stdout, lambda line: b"EMIN = " in line) if not line: break out["energy"] = {"EMIN": float(line.split("=")[1])} line = await stdout.readline() if b"ESCBOT" in line: out["energy"]["ESCBOT"] = float(line.split(b"=")[1]) line = await stdout.readline() if b"ECTOP" in line: out["energy"]["ECTOP"] = float(line.split(b"=")[1]) line = await readline_until(stdout, lambda line: b"SPRKKR-run for: " in line) line = line.strip() if first and self.print_info: print(line) first = False run = line.replace("SPRKKR-run for:", "") out["system_name"] = run line = await readline_until(stdout, lambda line: b" E=" in line) atoms = [] while True: atoms.append( await atomic_types_definition.parse_from_stream( stdout, up_to=b"\n -------------------------------------------------------------------------------", start=line, ) ) line = await readline_until(stdout, lambda line: line != b"\n") if "E=" not in line: break out["atomic_types"] = atoms duration = None while True: line = await readline_until(stdout, _err_or_time.search) if "execution time for last iteration" in line: try: duration = float(line.split()[-2]) except (ValueError, IndexError): pass else: break items = line.split() out["iteration"] = int(items[0]) out["error"] = float(items[2]) out["b_error"] = float(items[3]) out["energy"]["EF"] = float(items[5]) out["moment"] = {"spin": float(items[10]), "orbital": float(items[11])} if duration is not None: out["duration"] = duration line = (await readline(stdout)).split() out["energy"]["ETOT"] = float((float(line[1]) * unyt.Ry).to(unyt.eV)) out["converged"] = line[5] == "converged" section = iterations.add() section.set(out) if self.print_info: error = fortran_format(out["error"], ":>12e") print( f"Iteration {out['iteration']:>5} error {error} " f"spin moment: {out['moment']['spin']:>13.6e} " f"orbital moment: {out['moment']['orbital']:>13.6e} " ) if self.print_output == "info": if not iterations: print( "ERROR: No iteration has been finished. There is probably an error in the input files, please, examine the SPR-KKR output file." ) elif iterations[-1]["converged"](): print("OK: The computation converged.") else: print("WARNING: The computation does not converged!!!") result.iterations = iterations except EOFError: raise Exception("The output ends unexpectedly")
[docs] class ScfOutputReader(KkrOutputReader): result_class = ScfResult parser_class = ScfOutputParser