Source code for airsspy.jf.runners

"""
Execution runners for AIRSS calculations.

Pure computation classes with no jobflow dependency. Each runner handles
one buildcell invocation or one CASTEP relaxation cycle. They are usable
standalone or within jobflow Makers.
"""

import logging
import re
import shlex
import shutil
import subprocess
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)


[docs] def clean_files( struct_name: str, extensions: list[str], extra_paths: list[str] | None = None, ) -> None: """Remove files associated with a failed calculation. Args: struct_name: Structure name (without extension). extensions: File extensions to remove (e.g. ``[".castep", ".cell"]``). extra_paths: Additional paths to remove (files or directories). """ for ext in extensions: p = Path(struct_name + ext) if p.is_file(): p.unlink() for ep in extra_paths or []: p = Path(ep) if p.is_dir(): shutil.rmtree(p, ignore_errors=True) elif p.is_file(): p.unlink()
[docs] def run_buildcell( seed_name: str, seed_content: str, build_timeout: int = 30, write_seed: bool = True, seed_text_transform=None, max_attempts: int = 3, ) -> Optional[dict[str, str]]: """ Run the buildcell executable to generate a random structure. Args: seed_name: Name of the seed (without extension). seed_content: Content of the seed .cell file. build_timeout: Timeout in seconds for each buildcell attempt. write_seed: Whether to write the seed .cell file to disk. seed_text_transform: Optional callable that rewrites seed content before each buildcell attempt. max_attempts: Maximum number of buildcell attempts. Returns: Dictionary with ``struct_name``, ``seed_name``, ``seed_hash``, ``struct_content`` keys, or None if all attempts timed out. """ from ..casteptools import get_rand_cell_name logger.info("Starting random structure generation...") attempt = max_attempts stdout: Optional[str] = None input_content = seed_content while attempt > 0: try: input_content = ( seed_text_transform(seed_content) if seed_text_transform is not None else seed_content ) proc = subprocess.Popen( "buildcell", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) out, _ = proc.communicate(input_content, timeout=build_timeout) except subprocess.TimeoutExpired: attempt -= 1 proc.kill() else: stdout = out break if attempt <= 0: logger.error("Random structure generation timed out") return None logger.info("Random structure generation completed") cell_name = get_rand_cell_name(seed_name) struct_name = cell_name.replace(".cell", "") if write_seed: Path(seed_name + ".cell").write_text(seed_content) Path(cell_name).write_text(stdout) Path(struct_name + "-orig.cell").write_text(stdout) return { "struct_name": struct_name, "seed_name": seed_name, "struct_content": stdout, }
[docs] class AirssCastepSinglePointRunner: """Execute a CASTEP single-point calculation.""" _cleanup_extensions = [ ".castep", ".cell", ".param", "-out.cell", "-orig.cell", ".res", ".err", ] def __init__(self, executable: str = "castep.mpi") -> None: self.executable = executable
[docs] def clean_failed(self, struct_name: str) -> None: clean_files(struct_name, self._cleanup_extensions)
[docs] def prepare_inputs(self, struct_name: str, cellinput, paraminput) -> None: """Write .cell and .param files to disk. Args: struct_name: Seed name (without extension). cellinput: A CastepInput/CellInput instance or string. paraminput: A CastepInput/ParamInput instance or string. """ from castepinput.inputs import CastepInput cell_name = struct_name + ".cell" param_name = struct_name + ".param" if isinstance(cellinput, CastepInput): cellinput = cellinput.get_string() if isinstance(paraminput, CastepInput): paraminput = paraminput.get_string() Path(cell_name).write_text(cellinput) Path(param_name).write_text(paraminput)
[docs] def run(self, struct_name: str, cellinput, paraminput) -> int: """ Run a single-point calculation. Args: struct_name: Seed name (without extension). cellinput: Cell input (CellInput instance or string). paraminput: Param input (ParamInput instance or string). Returns: 0 on success, non-zero on failure. """ paraminput["task"] = "singlepoint" self.prepare_inputs(struct_name, cellinput, paraminput) output = subprocess.run(self.executable.split() + [struct_name], check=False) return output.returncode
[docs] class AirssCastepRelaxRunner(AirssCastepSinglePointRunner): """ Execute a cyclic CASTEP geometry optimisation. Runs successive relaxations, copying the output cell back to the input between cycles. Requires two consecutive successful runs to declare convergence. Stops at max_iterations. """ def __init__( self, executable: str = "castep.mpi", max_fails: int = 2, max_iterations: int = 200, ) -> None: super().__init__(executable=executable) self.max_fails = max_fails self.max_iterations = max_iterations
[docs] def run(self, struct_name: str, cellinput, paraminput) -> int: """ Run cyclic CASTEP relaxation. Args: struct_name: Seed name (without extension). cellinput: Cell input (CellInput instance or string). paraminput: Param input (ParamInput instance or string). Returns: 0 if converged, 1 if not converged or failed. """ from castepinput.inputs import CellInput paraminput["task"] = "geometryoptimization" paraminput["write_cell_structure"] = True self.prepare_inputs(struct_name, cellinput, paraminput) fail_counter = 0 cycle = 0 success_counter = 0 iter_counter = 0 while iter_counter < self.max_iterations: if fail_counter > self.max_fails: return 1 cycle += 1 output = subprocess.run( self.executable.split() + [struct_name], check=False ) if output.returncode != 0: fail_counter += 1 continue fail_counter = 0 result = None max_iter = 0 with open(struct_name + ".castep") as fhandle: for line in fhandle: match = re.search( r"Geometry optimization ([a-z]+)", line, re.IGNORECASE ) if match is not None: status = match.group(1).lower() if status == "completed": result = True elif status == "failed": result = False match = re.search( r"Finished iteration +(\d+)", line, re.IGNORECASE ) if match is not None: max_iter = int(match.group(1)) iter_counter += max_iter if result is True: success_counter += 1 if result is False: success_counter = 0 if success_counter >= 2: break # Update .cell with structure from -out.cell, preserving # all non-structural content (kpoints, species, symmetry, # etc.) from the original .cell. out_cell = Path(struct_name + "-out.cell") if out_cell.is_file(): from castepinput import CellInput out = CellInput.from_file(str(out_cell)) cell_path = Path(struct_name + ".cell") cell_in = CellInput.from_file(str(cell_path)) # Replace lattice block: copy whichever format the # output uses, delete the other from the input. lattice_key = ( "lattice_cart" if "lattice_cart" in out else "lattice_abc" if "lattice_abc" in out else None ) if lattice_key is None: raise RuntimeError(f"No lattice block in {out_cell}") for k in ("lattice_cart", "lattice_abc"): if k in cell_in and k != lattice_key: del cell_in[k] cell_in[lattice_key] = out[lattice_key] # Replace positions block: copy whichever format the # output uses, delete the other from the input. positions_key = ( "positions_abs" if "positions_abs" in out else "positions_frac" if "positions_frac" in out else None ) if positions_key is None: raise RuntimeError(f"No positions block in {out_cell}") for k in ("positions_abs", "positions_frac"): if k in cell_in and k != positions_key: del cell_in[k] cell_in[positions_key] = out[positions_key] cell_in.save(str(cell_path)) return 0 if success_counter >= 2 else 1
[docs] def compose_task_doc(struct_name: str) -> dict: """ Extract results from a completed CASTEP calculation. Reads the .castep and .cell files, computes derived properties, writes a ``<struct_name>.res`` file to disk, and returns a dictionary suitable for constructing an ``AirssResultDoc``. Args: struct_name: Seed name (without extension). Returns: Dictionary with energy, structure, volume, formula, etc. """ from ase import Atoms from castepinput.inputs import CellInput from pymatgen.io.ase import AseAtomsAdaptor from ..restools import save_airss_res energy = None pressure = None efficiency = None spin = 0.0 modspin = 0.0 spin_moms: list[float] = [] in_spin_group = False total_time = None castep_file = struct_name + ".castep" if Path(castep_file).is_file(): with open(castep_file) as fhandle: for line in fhandle: if "NB est. 0K energy" in line: energy = float(line.split()[-2]) if "Pressure: " in line: pressure = float(line.split()[-2]) if "Overall parallel efficiency" in line: match = re.search(r"(\d+)%", line) if match: efficiency = float(match.group(1)) / 100.0 if "Total time" in line and "=" in line: match = re.search(r"=\s*([0-9.]+)", line) if match: total_time = float(match.group(1)) if "Spin den" in line: spin = float(line.split()[-2]) if "|Spin den" in line: modspin = float(line.split()[-2]) if " Total Charge(e) Spin(hbar/2)" in line: in_spin_group = True spin_moms = [] continue if " Length (A)" in line: in_spin_group = False if in_spin_group: tokens = line.split() if re.match(r"^ +[A-Za-z]+ ", line): spin_moms.append(float(tokens[-1])) out_cell_path = Path(struct_name + "-out.cell") if out_cell_path.is_file(): # Strip ANG keyword that castepinput can't parse out_content = out_cell_path.read_text() out_content_clean = "\n".join( line for line in out_content.splitlines() if line.strip() != "ANG" ) tmp_path = Path(struct_name + "-out-tmp.cell") tmp_path.write_text(out_content_clean) cell = CellInput.from_file(str(tmp_path)) tmp_path.unlink(missing_ok=True) else: cell = CellInput.from_file(struct_name + ".cell") elements, positions, _tags = cell.get_positions() atoms = Atoms(symbols=elements, positions=positions, cell=cell.get_cell(), pbc=True) volume = atoms.get_volume() # Compute symmetry via spglib try: import spglib sg = spglib.get_spacegroup( ( atoms.get_cell().array, atoms.get_scaled_positions(), atoms.get_atomic_numbers(), ), symprec=0.1, ) sym = sg.split()[0] if sg else "P1" except (ImportError, Exception): sym = "P1" # Build REM lines from .castep and .cell metadata from ..casteptools import build_rem_lines rem_lines = build_rem_lines(struct_name) info = { "uid": struct_name, "H": energy if energy else 0.0, "P": pressure if pressure is not None else 0.0, "V": volume, "nat": len(atoms), "sym": sym, "rem": rem_lines, } save_airss_res(atoms, info, fname=struct_name + ".res", force_write=True) structure = AseAtomsAdaptor.get_structure(atoms) if spin_moms: structure.add_site_property("spin", spin_moms) return { "structure": structure, "volume": structure.volume, "reduced_formula": structure.reduced_formula, "formula": structure.composition.formula.replace(" ", ""), "natoms": len(atoms), "label": struct_name, "energy": energy, "energy_per_atom": energy / len(atoms) if energy else None, "spin": spin, "mod_spin": modspin, "pressure": pressure, "parallel_efficiency": efficiency, "total_time": total_time, "res_content": Path(struct_name + ".res").read_text() if Path(struct_name + ".res").is_file() else None, "rem_lines": rem_lines, }
[docs] class AirssScriptRelaxRunner: """ Base runner for external AIRSS relaxation scripts (gulp_relax, pp3_relax). Calls an external script and checks the output file for success. Subclasses must override ``_get_cmd`` and set ``_param_suffix``. """ _param_suffix: str = ".param" _cleanup_extensions: list[str] = [] def __init__( self, executable: str = "gulp", timeout: int = 600, max_attempts: int = 3, ) -> None: self.executable = executable self.timeout = timeout self.max_attempts = max_attempts
[docs] def clean_failed(self, struct_name: str) -> None: clean_files(struct_name, self._cleanup_extensions)
def _get_cmd(self, struct_name: str) -> list[str]: """Construct the shell command. Must be overridden by subclasses.""" raise NotImplementedError def _check_success(self, struct_name: str, stdout: str) -> bool: """Check if the relaxation finished successfully.""" from ..casteptools import gulp_relax_finish_ok return gulp_relax_finish_ok(struct_name + ".castep") def _prepare_inputs( self, struct_name: str, struct_content: str, param_content: str, seed_name: Optional[str] = None, ) -> None: """Write .cell and code-specific param files to disk.""" Path(struct_name + ".cell").write_text(struct_content) Path(struct_name + self._param_suffix).write_text(param_content)
[docs] def run( self, struct_name: str, struct_content: str, param_content: str, seed_name: Optional[str] = None, ) -> int: """ Run relaxation via the external script. Args: struct_name: Structure name (without extension). struct_content: Content of the .cell file. param_content: Content of the code-specific param file. seed_name: Seed name (needed by GULP for .lib file rename). Returns: 0 on success, 1 on failure. """ self._prepare_inputs(struct_name, struct_content, param_content, seed_name) cmd = self._get_cmd(struct_name) attempt = 0 while attempt < self.max_attempts: attempt += 1 try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=self.timeout, check=False, ) except subprocess.TimeoutExpired: logger.warning( "Relaxation attempt %d/%d timed out for %s", attempt, self.max_attempts, struct_name, ) continue if self._check_success(struct_name, result.stdout): return 0 logger.warning( "Relaxation attempt %d/%d failed for %s", attempt, self.max_attempts, struct_name, ) return 1
[docs] class AirssGulpRelaxRunner(AirssScriptRelaxRunner): """ Runner for GULP relaxation via the external ``gulp_relax`` script. Calls ``gulp_relax <exe> <cluster> <pressure> <struct_name>`` and checks for success via ``gulp_relax_finish_ok()`` and ``"Volume"`` in stdout. """ _param_suffix: str = ".lib" _cleanup_extensions = [ ".cell", ".lib", ".castep", ".gout", "-orig.cell", ".res", ".err", ] def __init__( self, executable: str = "ggulp", timeout: int = 600, max_attempts: int = 3, cluster: bool = False, pressure: float = 0.0, ) -> None: super().__init__( executable=executable, timeout=timeout, max_attempts=max_attempts ) self.cluster = cluster self.pressure = pressure def _get_cmd(self, struct_name: str) -> list[str]: return [ "gulp_relax", self.executable, str(int(self.cluster)), str(self.pressure), struct_name, ] def _check_success(self, struct_name: str, stdout: str) -> bool: from ..casteptools import gulp_relax_finish_ok return gulp_relax_finish_ok(struct_name + ".castep") and "Volume" in stdout def _prepare_inputs( self, struct_name: str, struct_content: str, param_content: str, seed_name: Optional[str] = None, ) -> None: super()._prepare_inputs(struct_name, struct_content, param_content, seed_name) # gulp_relax looks for <seed_name>.lib, not <struct_name>.lib if seed_name is not None and seed_name != struct_name: shutil.move(struct_name + ".lib", seed_name + ".lib")
[docs] class AirssPp3RelaxRunner(AirssScriptRelaxRunner): """ Runner for pp3 relaxation via the external ``pp3_relax`` script. Calls ``pp3_relax <exe> <struct_name>`` and checks for success via ``gulp_relax_finish_ok()``. """ _param_suffix: str = ".pp" _cleanup_extensions = [ ".cell", ".pp", ".castep", "-orig.cell", ".res", ".err", ] def __init__( self, executable: str = "pp3", timeout: int = 600, max_attempts: int = 3, ) -> None: super().__init__( executable=executable, timeout=timeout, max_attempts=max_attempts ) def _get_cmd(self, struct_name: str) -> list[str]: return ["pp3_relax", self.executable, struct_name]
[docs] class AirssVaspRelaxRunner: """Execute a local VASP relaxation in ``<struct_name>.vasp``.""" _cleanup_extensions = [".cell", ".INCAR", ".KPOINTS", "-orig.cell", ".res", ".err"] def __init__( self, executable: str = "vasp_std", pressure: float = 0.0, potcar_dir: str | None = None, potcar_map: dict[str, str] | None = None, max_fails: int = 2, max_iterations: int = 200, ) -> None: self.executable = executable self.pressure = pressure self.potcar_dir = potcar_dir self.potcar_map = potcar_map or {} self.max_fails = max_fails self.max_iterations = max_iterations self.last_metadata: dict | None = None self.last_outputs_fresh = False
[docs] def clean_failed(self, struct_name: str) -> None: clean_files( struct_name, self._cleanup_extensions, extra_paths=[f"{struct_name}.vasp"], )
[docs] def prepare_inputs( self, struct_name: str, cell_content: str, incar_content: str, kpoints_path: str | Path | None = None, ) -> dict: """Write top-level AIRSS inputs and prepare VASP input directory.""" from ..vasptools import prepare_vasp_inputs, structure_from_cell_text Path(struct_name + ".cell").write_text(cell_content) Path(struct_name + ".INCAR").write_text(incar_content) structure = structure_from_cell_text(cell_content) metadata = prepare_vasp_inputs( struct_name, structure, incar_content, mode="relax", pressure=self.pressure, potcar_dir=self.potcar_dir, potcar_map=self.potcar_map, kpoints_path=kpoints_path, ) self.last_metadata = metadata return metadata
def _run_vasp_process(self, workdir: Path, cycle: int) -> int: """Run one VASP process invocation in *workdir*.""" workdir.mkdir(parents=True, exist_ok=True) out_path = workdir / "vasp.out" mode = "w" if cycle == 1 else "a" with open(out_path, mode) as outf: if cycle > 1: outf.write(f"\n# airsspy VASP restart cycle {cycle}\n") output = subprocess.run( shlex.split(self.executable), stdout=outf, stderr=subprocess.STDOUT, cwd=workdir, check=False, ) return output.returncode def _output_mtimes(self, workdir: Path) -> dict[str, int | None]: """Return nanosecond mtimes for outputs that identify a VASP cycle.""" mtimes: dict[str, int | None] = {} for name in ("vasprun.xml", "CONTCAR"): path = workdir / name mtimes[name] = path.stat().st_mtime_ns if path.is_file() else None return mtimes def _vasprun_updated( self, workdir: Path, before: dict[str, int | None], ) -> bool: """Return True if the current VASP cycle produced a fresh vasprun.xml.""" after = self._output_mtimes(workdir) return ( after["vasprun.xml"] is not None and after["vasprun.xml"] != before.get("vasprun.xml") ) def _read_vasp_status( self, workdir: Path, before_mtimes: dict[str, int | None] | None = None, ) -> tuple[bool, int] | None: """Return ``(converged, ionic_steps)`` for the latest VASP run.""" if before_mtimes is not None and not self._vasprun_updated( workdir, before_mtimes ): logger.warning("VASP did not update vasprun.xml in %s", workdir) return None try: from ..vasptools import _parse_vasprun data = _parse_vasprun(workdir) except Exception as exc: logger.warning("Unable to parse VASP status in %s: %s", workdir, exc) return None if not data: logger.warning( "Unable to parse VASP status in %s: missing vasprun.xml", workdir ) return None if "parse_error" in data: logger.warning( "Unable to parse VASP status in %s: %s", workdir, data["parse_error"], ) return None return bool(data.get("converged")), int(data.get("ionic_steps") or 0) def _prepare_restart(self, workdir: Path) -> bool: """Restart VASP from the latest relaxed structure, if available.""" contcar = workdir / "CONTCAR" poscar = workdir / "POSCAR" if not contcar.is_file(): logger.error("Cannot restart VASP: missing %s", contcar) return False shutil.copyfile(contcar, poscar) return True
[docs] def run( self, struct_name: str, cell_content: str, incar_content: str, kpoints_path: str | Path | None = None, ) -> int: """Run a cyclic VASP relaxation. As with the CASTEP runner, two consecutive converged VASP runs are required before declaring success. Between cycles, ``CONTCAR`` is copied back to ``POSCAR`` so the next invocation continues from the last relaxed geometry. """ metadata = self.prepare_inputs( struct_name, cell_content, incar_content, kpoints_path=kpoints_path ) self.last_outputs_fresh = False workdir = Path(metadata["workdir"]) fail_counter = 0 success_counter = 0 iter_counter = 0 cycle = 0 while iter_counter < self.max_iterations: if fail_counter > self.max_fails: logger.error("VASP failed more than %d times", self.max_fails) return 1 cycle += 1 logger.info("Starting VASP relaxation cycle %d for %s", cycle, struct_name) before_mtimes = self._output_mtimes(workdir) return_code = self._run_vasp_process(workdir, cycle) if return_code != 0: if self._read_vasp_status(workdir, before_mtimes) is not None: self.last_outputs_fresh = True fail_counter += 1 logger.warning( "VASP cycle %d exited with return code %d", cycle, return_code, ) continue fail_counter = 0 status = self._read_vasp_status(workdir, before_mtimes) if status is None: fail_counter += 1 continue self.last_outputs_fresh = True converged, ionic_steps = status iter_counter += max(ionic_steps, 1) logger.info( "VASP cycle %d finished: converged=%s ionic_steps=%d total=%d/%d", cycle, converged, ionic_steps, iter_counter, self.max_iterations, ) if converged: success_counter += 1 else: success_counter = 0 if success_counter >= 2: return 0 if not self._prepare_restart(workdir): return 1 logger.error("VASP relaxation reached max_iterations=%d", self.max_iterations) return 1
[docs] class AirssVaspSinglePointRunner(AirssVaspRelaxRunner): """Execute a local VASP single-point calculation."""
[docs] def prepare_inputs( self, struct_name: str, cell_content: str, incar_content: str, kpoints_path: str | Path | None = None, ) -> dict: from ..vasptools import prepare_vasp_inputs, structure_from_cell_text Path(struct_name + ".cell").write_text(cell_content) Path(struct_name + ".INCAR").write_text(incar_content) structure = structure_from_cell_text(cell_content) metadata = prepare_vasp_inputs( struct_name, structure, incar_content, mode="sp", pressure=self.pressure, potcar_dir=self.potcar_dir, potcar_map=self.potcar_map, kpoints_path=kpoints_path, ) self.last_metadata = metadata return metadata
[docs] def run( self, struct_name: str, cell_content: str, incar_content: str, kpoints_path: str | Path | None = None, ) -> int: """Run one VASP single-point calculation.""" metadata = self.prepare_inputs( struct_name, cell_content, incar_content, kpoints_path=kpoints_path ) return self._run_vasp_process(Path(metadata["workdir"]), cycle=1)
[docs] class AirssAbacusRelaxRunner: """ Execute a cyclic ABACUS geometry optimisation. Calls the ABACUS binary directly and manages the relaxation loop in Python, following the same two-phase pattern as ``abacus_relax``: 1. Three short rough runs with ``relax_nmax=3`` 2. Full convergence loop until two successive convergences Between each ABACUS invocation, the structure is read from ``STRU_ION_D``, converted back to .cell format, and fed into the next iteration. """ _cleanup_extensions = [".cell", ".INPUT", "-orig.cell", ".res", ".err"] def __init__( self, executable: str = "abacus", max_fails: int = 2, max_iterations: int = 200, pressure: float = 0.0, ) -> None: self.executable = executable self.max_fails = max_fails self.max_iterations = max_iterations self.pressure = pressure
[docs] def clean_failed(self, struct_name: str) -> None: clean_files( struct_name, self._cleanup_extensions, extra_paths=[f"{struct_name}.abacus"], )
def _set_input_param(self, input_path: str, key: str, value: str) -> None: """Set or add a parameter in an ABACUS INPUT file.""" content = Path(input_path).read_text() lines = content.splitlines() found = False for i, line in enumerate(lines): if re.match(rf"^\s*{re.escape(key)}", line): lines[i] = f"{key} {value}" found = True break if not found: lines.append(f"{key} {value}") Path(input_path).write_text("\n".join(lines)) def _detect_logfile(self, workdir: str, input_path: str) -> str: """Detect the ABACUS log file path based on calculation type.""" from ..abacustools import detect_logfile result = detect_logfile(workdir, input_path) return result or ""
[docs] def prepare_inputs( self, struct_name: str, cell_content: str, input_content: str, ) -> None: """Write .cell and .INPUT files, convert .cell to STRU. Args: struct_name: Structure name (without extension). cell_content: Content of the .cell file. input_content: Content of the ABACUS INPUT file. """ workdir = f"{struct_name}.abacus" Path(workdir).mkdir(parents=True, exist_ok=True) # Write .cell file cell_path = struct_name + ".cell" Path(cell_path).write_text(cell_content) # Write INPUT file input_path = struct_name + ".INPUT" Path(input_path).write_text(input_content) # Convert .cell to STRU from ..abacustools import cell_to_stru stru_content = cell_to_stru(cell_content) Path(f"{workdir}/STRU").write_text(stru_content) # Copy INPUT to workdir Path(f"{workdir}/INPUT").write_text(input_content)
def _run_single( self, struct_name: str, workdir: str, input_path: str, ) -> Optional[dict]: """Run a single ABACUS calculation and parse results. Returns: Dict with converged, n_steps, energy, pressure, volume or None if the calculation crashed. """ out_path = Path(f"{workdir}/abacus_out") with open(out_path, "w") as outf: subprocess.run( self.executable.split(), stdout=outf, stderr=subprocess.STDOUT, cwd=workdir, check=False, ) output = out_path.read_text() # Check for crash (ABACUS writes "TOTAL Time" on success) if "TOTAL Time" not in output: logger.warning("ABACUS crashed for %s", struct_name) for line in output.splitlines()[-5:]: logger.info(" | %s", line) return None logfile = self._detect_logfile(workdir, input_path) if not logfile: logger.warning("No ABACUS log file found for %s", struct_name) return None from ..abacustools import parse_abacus_log log_data = parse_abacus_log(logfile) converged = log_data.get("converged", False) n_steps = log_data.get("n_ionic_steps", 0) energy = log_data.get("energy") logger.info( "%s: %s, %d ionic steps, E=%.6f eV", struct_name, "converged" if converged else "not converged", n_steps, energy, ) return log_data
[docs] def run( self, struct_name: str, cell_content: str, input_content: str, ) -> int: """ Run cyclic ABACUS relaxation. Phase 1: three rough runs with relax_nmax=3 for initial optimisation. Phase 2: full convergence loop until two consecutive convergences or max_iterations is reached. Args: struct_name: Structure name (without extension). cell_content: Content of the .cell file. input_content: Content of the ABACUS INPUT file. Returns: 0 if converged, 1 if not converged or failed. """ self.prepare_inputs(struct_name, cell_content, input_content) workdir = f"{struct_name}.abacus" input_path = f"{workdir}/INPUT" # Save original relax_nmax if set by user user_relax_nmax = None for line in Path(input_path).read_text().splitlines(): m = re.match(r"^\s*relax_nmax\s+(\S+)", line) if m: user_relax_nmax = m.group(1) break fail_counter = 0 success_counter = 0 iter_counter = 0 # Phase 1: three rough runs with relax_nmax=3 logger.info("%s: phase 1 — 3 rough runs (relax_nmax=3)", struct_name) self._set_input_param(input_path, "relax_nmax", "3") for rough_i in range(3): if fail_counter > self.max_fails: logger.error( "%s: too many failures (%d), aborting", struct_name, fail_counter ) return 1 logger.info( "%s: rough run %d/3 (iter=%d/%d)", struct_name, rough_i + 1, iter_counter, self.max_iterations, ) result = self._run_single(struct_name, workdir, input_path) if result is None: fail_counter += 1 continue fail_counter = 0 iter_counter += result.get("n_ionic_steps", 0) if result.get("converged"): success_counter += 1 else: success_counter = 0 self._update_cell(struct_name, workdir) # Restore original relax_nmax for phase 2 if user_relax_nmax is None: content = Path(input_path).read_text() lines = [ line for line in content.splitlines() if not re.match(r"^\s*relax_nmax", line) ] Path(input_path).write_text("\n".join(lines)) else: self._set_input_param(input_path, "relax_nmax", user_relax_nmax) # Phase 2: full convergence loop if success_counter < 2: cycle = 0 while iter_counter < self.max_iterations: if fail_counter > self.max_fails: logger.error( "%s: too many failures (%d), aborting", struct_name, fail_counter, ) return 1 cycle += 1 logger.info( "%s: phase 2 cycle %d (iter=%d/%d, consecutive_ok=%d)", struct_name, cycle, iter_counter, self.max_iterations, success_counter, ) result = self._run_single(struct_name, workdir, input_path) if result is None: fail_counter += 1 continue fail_counter = 0 iter_counter += result.get("n_ionic_steps", 0) if result.get("converged"): success_counter += 1 else: success_counter = 0 if success_counter >= 2: break self._update_cell(struct_name, workdir) converged = success_counter >= 2 logger.info( "%s: finished — %s (%d total iterations)", struct_name, "converged" if converged else "not converged", iter_counter, ) return 0 if converged else 1
def _update_cell(self, struct_name: str, workdir: str) -> None: """Read STRU_ION_D output and update the input .cell file.""" from ..abacustools import parse_abacus_stru stru_path = Path(workdir) / "OUT.ABACUS" / "STRU_ION_D" if not stru_path.is_file(): return elements, positions, cell = parse_abacus_stru(str(stru_path)) # Convert fractional to Cartesian positions cart_positions = positions @ cell cell_path = struct_name + ".cell" if not Path(cell_path).is_file(): return # Rewrite the .cell file with updated lattice and positions, # preserving other blocks (SPECIES_POT, KPOINTS, etc.) old_content = Path(cell_path).read_text() new_lattice = "\n".join( f" {v[0]:.10f} {v[1]:.10f} {v[2]:.10f}" for v in cell.tolist() ) new_positions = "\n".join( f"{e} {p[0]:.10f} {p[1]:.10f} {p[2]:.10f}" for e, p in zip(elements, cart_positions.tolist()) ) # Replace LATTICE_CART block new_content = re.sub( r"%BLOCK\s+LATTICE_CART.*?%ENDBLOCK\s+LATTICE_CART", f"%BLOCK LATTICE_CART\n{new_lattice}\n%ENDBLOCK LATTICE_CART", old_content, count=1, flags=re.DOTALL | re.IGNORECASE, ) # Replace POSITIONS_FRAC block new_content = re.sub( r"%BLOCK\s+POSITIONS_FRAC.*?%ENDBLOCK\s+POSITIONS_FRAC", f"%BLOCK POSITIONS_FRAC\n{new_positions}\n%ENDBLOCK POSITIONS_FRAC", new_content, count=1, flags=re.DOTALL | re.IGNORECASE, ) # Remove positions_abs block (not needed, avoids #label pollution) new_content = re.sub( r"%BLOCK\s+POSITIONS_ABS.*?%ENDBLOCK\s+POSITIONS_ABS\s*", "", new_content, count=1, flags=re.DOTALL | re.IGNORECASE, ) Path(cell_path).write_text(new_content) # Also regenerate STRU for next ABACUS run cell_content = Path(cell_path).read_text() from ..abacustools import cell_to_stru stru_content = cell_to_stru(cell_content) Path(f"{workdir}/STRU").write_text(stru_content)
[docs] class AirssAbacusSinglePointRunner: """ Execute a single ABACUS single-point (SCF) calculation. Runs ABACUS once with ``calculation scf`` in the INPUT file. No cyclic relaxation loop. Checks for ``TOTAL Time`` in the output to determine success. """ _cleanup_extensions = [".cell", ".INPUT", "-orig.cell", ".res", ".err"] def __init__(self, executable: str = "abacus") -> None: self.executable = executable
[docs] def clean_failed(self, struct_name: str) -> None: clean_files( struct_name, self._cleanup_extensions, extra_paths=[f"{struct_name}.abacus"], )
[docs] def prepare_inputs( self, struct_name: str, cell_content: str, input_content: str, ) -> None: """Write .cell, .INPUT files and convert to STRU. Forces ``calculation scf`` in the INPUT file regardless of what the user specified. """ from ..abacustools import cell_to_stru workdir = f"{struct_name}.abacus" Path(workdir).mkdir(parents=True, exist_ok=True) # Write .cell file Path(struct_name + ".cell").write_text(cell_content) # Force calculation to scf (single-point) lines = input_content.splitlines() new_lines = [] found = False for line in lines: if re.match(r"^\s*calculation\s+", line): new_lines.append("calculation scf") found = True else: new_lines.append(line) if not found: new_lines.append("calculation scf") input_content = "\n".join(new_lines) # Write INPUT file (both in cwd and in workdir) Path(struct_name + ".INPUT").write_text(input_content) Path(f"{workdir}/INPUT").write_text(input_content) # Convert .cell to STRU stru_content = cell_to_stru(cell_content) Path(f"{workdir}/STRU").write_text(stru_content)
[docs] def run( self, struct_name: str, cell_content: str, input_content: str, ) -> int: """ Run a single-point ABACUS calculation. Args: struct_name: Structure name (without extension). cell_content: Content of the .cell file. input_content: Content of the ABACUS INPUT file. Returns: 0 on success, 1 on failure. """ self.prepare_inputs(struct_name, cell_content, input_content) workdir = f"{struct_name}.abacus" out_path = Path(f"{workdir}/abacus_out") with open(out_path, "w") as outf: subprocess.run( self.executable.split(), stdout=outf, stderr=subprocess.STDOUT, cwd=workdir, check=False, ) output = out_path.read_text() if "TOTAL Time" not in output: logger.warning("ABACUS single-point crashed for %s", struct_name) for line in output.splitlines()[-5:]: logger.info(" | %s", line) return 1 logger.info("ABACUS single-point completed for %s", struct_name) return 0