Source code for airsspy.jf.jobs

"""
Jobflow Makers for AIRSS searches and relaxations.

Both ``AirssSearchMaker`` and ``AirssRelaxMaker`` produce the same
``AirssJobDoc`` output type, supporting multi-structure jobs for
high-throughput scenarios.
"""

import logging
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from jobflow import Maker, Response, job
from pymatgen.core import Structure

from .documents import AirssJobDoc, AirssResultDoc, RelaxOutcome
from .runners import (
    AirssAbacusRelaxRunner,
    AirssCastepRelaxRunner,
    AirssGulpRelaxRunner,
    AirssPp3RelaxRunner,
    AirssVaspRelaxRunner,
    compose_task_doc,
    run_buildcell,
)

logger = logging.getLogger(__name__)


def _get_hash(content: str) -> str:
    """Return a short hash of a string."""
    import hashlib

    return hashlib.md5(content.encode()).hexdigest()[:8]


[docs] @dataclass class AirssSearchMaker(Maker): """ Run N build+relax cycles as a single jobflow job. Generates N random structures from a seed using buildcell, then relaxes each one with CASTEP. All results are collected into a single ``AirssJobDoc``. """ name: str = "airss search" n_structures: int = 50 executable: str = "castep.mpi" build_timeout: int = 60 cycles: int = 4 max_fails: int = 2 max_iterations: int = 200 write_seed: bool = True stop_if_not_converged: bool = False code: str = "castep"
[docs] @job def make( self, seed_name: str, seed_content: str, paraminput, project_name: str, ) -> Response: """ Generate N random structures and relax them. Args: seed_name: Name of the seed (without extension). seed_content: Content of the seed .cell file. paraminput: ParamInput instance for CASTEP. project_name: Project identifier for grouping results. """ results: list[AirssResultDoc] = [] n_failed = 0 for i in range(self.n_structures): logger.info("Building structure %d/%d", i + 1, self.n_structures) struct_name = None runner = None try: build_output = run_buildcell( seed_name, seed_content, build_timeout=self.build_timeout, write_seed=self.write_seed, ) if build_output is None: logger.warning("Buildcell timed out for structure %d", i + 1) continue struct_name = build_output["struct_name"] if self.code == "castep": from castepinput.inputs import CellInput cellinput = CellInput.from_file(struct_name + ".cell") runner = AirssCastepRelaxRunner( executable=self.executable, max_fails=self.max_fails, max_iterations=self.max_iterations, ) return_code = runner.run(struct_name, cellinput, paraminput) elif self.code == "gulp": struct_content = Path(struct_name + ".cell").read_text() param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssGulpRelaxRunner(executable=self.executable) return_code = runner.run( struct_name, struct_content, param_content, seed_name=seed_name ) elif self.code == "pp3": struct_content = Path(struct_name + ".cell").read_text() param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssPp3RelaxRunner(executable=self.executable) return_code = runner.run( struct_name, struct_content, param_content, seed_name=seed_name ) elif self.code == "abacus": struct_content = Path(struct_name + ".cell").read_text() param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssAbacusRelaxRunner( executable=self.executable, max_iterations=self.max_iterations, ) return_code = runner.run(struct_name, struct_content, param_content) elif self.code == "vasp": struct_content = Path(struct_name + ".cell").read_text() param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssVaspRelaxRunner( executable=self.executable, max_fails=self.max_fails, max_iterations=self.max_iterations, ) return_code = runner.run(struct_name, struct_content, param_content) else: raise ValueError(f"Unknown code: {self.code}") if return_code == 0: relax_status = RelaxOutcome.FINISHED else: relax_status = RelaxOutcome.ERRORED if self.code == "abacus": from ..abacustools import compose_abacus_task_doc task_doc = compose_abacus_task_doc(struct_name) elif self.code == "vasp": from ..vasptools import compose_vasp_task_doc if return_code != 0 and not runner.last_outputs_fresh: raise RuntimeError( "VASP did not produce fresh parseable output" ) task_doc = compose_vasp_task_doc( struct_name, metadata=runner.last_metadata ) else: task_doc = compose_task_doc(struct_name) result_doc = AirssResultDoc( struct_name=struct_name, seed_name=seed_name, project_name=project_name, structure=task_doc.get("structure"), energy=task_doc.get("energy"), energy_per_atom=task_doc.get("energy_per_atom"), volume=task_doc.get("volume"), pressure=task_doc.get("pressure"), spin=task_doc.get("spin", 0.0), mod_spin=task_doc.get("mod_spin", 0.0), symmetry=task_doc.get("symmetry"), formula=task_doc.get("formula"), reduced_formula=task_doc.get("reduced_formula"), natoms=task_doc.get("natoms"), res_content=task_doc.get("res_content"), parallel_efficiency=task_doc.get("parallel_efficiency"), total_time=task_doc.get("total_time"), relax_status=relax_status, rem_lines=task_doc.get("rem_lines"), ) results.append(result_doc) except Exception as e: logger.error( "Structure %d/%d failed with exception: %s", i + 1, self.n_structures, e, exc_info=True, ) if struct_name and runner: try: runner.clean_failed(struct_name) except Exception: pass n_failed += 1 results.append( AirssResultDoc( struct_name=struct_name or f"unknown-{i}", seed_name=seed_name, project_name=project_name, relax_status=RelaxOutcome.FAILED, error_message=str(e), ) ) continue n_finished = sum(1 for r in results if r.relax_status == RelaxOutcome.FINISHED) n_errored = sum(1 for r in results if r.relax_status == RelaxOutcome.ERRORED) search_doc = AirssJobDoc( project_name=project_name, seed_name=seed_name, job_type="search", seed_content=seed_content, seed_hash=_get_hash(seed_content), results=results, n_structures=len(results), n_finished=n_finished, n_errored=n_errored, n_failed=n_failed, ) stop = False if ( self.stop_if_not_converged and len(results) > 0 and n_errored == len(results) ): stop = True return Response(stop_children=stop, output=search_doc)
[docs] @dataclass class AirssRelaxMaker(Maker): """ Relax one or more provided structures as a single jobflow job. Accepts lists of structures, names, and cell inputs. All structures share the same param input and project/seed metadata. """ name: str = "airss relax" executable: str = "castep.mpi" cycles: int = 4 max_fails: int = 2 max_iterations: int = 200 stop_if_not_converged: bool = False code: str = "castep"
[docs] @job def make( self, structures: list[Structure], struct_names: list[str], cellinputs: list, paraminput, project_name: str, seed_name: str, ) -> Response: """ Relax N structures in a single job. Args: structures: List of pymatgen Structure objects. struct_names: Corresponding structure names. cellinputs: Corresponding CellInput instances. paraminput: Shared ParamInput instance. project_name: Project identifier. seed_name: Seed name for metadata. """ results: list[AirssResultDoc] = [] n_failed = 0 for structure, struct_name, cellinput in zip( structures, struct_names, cellinputs ): runner = None try: if self.code == "castep": runner = AirssCastepRelaxRunner( executable=self.executable, max_fails=self.max_fails, max_iterations=self.max_iterations, ) cellinput.set_positions( [str(elem) for elem in structure.species], structure.cart_coords, ) cellinput.set_cell(structure.lattice.matrix) return_code = runner.run(struct_name, cellinput, paraminput) elif self.code == "gulp": struct_content = ( cellinput.get_string() if hasattr(cellinput, "get_string") else str(cellinput) ) param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssGulpRelaxRunner(executable=self.executable) return_code = runner.run( struct_name, struct_content, param_content, seed_name=seed_name ) elif self.code == "pp3": struct_content = ( cellinput.get_string() if hasattr(cellinput, "get_string") else str(cellinput) ) param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssPp3RelaxRunner(executable=self.executable) return_code = runner.run( struct_name, struct_content, param_content, seed_name=seed_name ) elif self.code == "abacus": struct_content = ( cellinput.get_string() if hasattr(cellinput, "get_string") else str(cellinput) ) param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssAbacusRelaxRunner( executable=self.executable, max_iterations=self.max_iterations, ) return_code = runner.run(struct_name, struct_content, param_content) elif self.code == "vasp": struct_content = ( cellinput.get_string() if hasattr(cellinput, "get_string") else str(cellinput) ) param_content = ( paraminput.get_string() if hasattr(paraminput, "get_string") else str(paraminput) ) runner = AirssVaspRelaxRunner( executable=self.executable, max_fails=self.max_fails, max_iterations=self.max_iterations, ) return_code = runner.run(struct_name, struct_content, param_content) else: raise ValueError(f"Unknown code: {self.code}") relax_status = ( RelaxOutcome.FINISHED if return_code == 0 else RelaxOutcome.ERRORED ) if self.code == "abacus": from ..abacustools import compose_abacus_task_doc task_doc = compose_abacus_task_doc(struct_name) elif self.code == "vasp": from ..vasptools import compose_vasp_task_doc if return_code != 0 and not runner.last_outputs_fresh: raise RuntimeError( "VASP did not produce fresh parseable output" ) task_doc = compose_vasp_task_doc( struct_name, metadata=runner.last_metadata ) else: task_doc = compose_task_doc(struct_name) result_doc = AirssResultDoc( struct_name=struct_name, seed_name=seed_name, project_name=project_name, structure=task_doc.get("structure"), energy=task_doc.get("energy"), energy_per_atom=task_doc.get("energy_per_atom"), volume=task_doc.get("volume"), pressure=task_doc.get("pressure"), spin=task_doc.get("spin", 0.0), mod_spin=task_doc.get("mod_spin", 0.0), symmetry=task_doc.get("symmetry"), formula=task_doc.get("formula"), reduced_formula=task_doc.get("reduced_formula"), natoms=task_doc.get("natoms"), res_content=task_doc.get("res_content"), parallel_efficiency=task_doc.get("parallel_efficiency"), total_time=task_doc.get("total_time"), relax_status=relax_status, rem_lines=task_doc.get("rem_lines"), ) results.append(result_doc) except Exception as e: logger.error( "Structure %s failed with exception: %s", struct_name, e, exc_info=True, ) if runner: try: runner.clean_failed(struct_name) except Exception: pass n_failed += 1 results.append( AirssResultDoc( struct_name=struct_name, seed_name=seed_name, project_name=project_name, relax_status=RelaxOutcome.FAILED, error_message=str(e), ) ) continue n_finished = sum(1 for r in results if r.relax_status == RelaxOutcome.FINISHED) n_errored = sum(1 for r in results if r.relax_status == RelaxOutcome.ERRORED) relax_doc = AirssJobDoc( project_name=project_name, seed_name=seed_name, job_type="relax", results=results, n_structures=len(results), n_finished=n_finished, n_errored=n_errored, n_failed=n_failed, ) stop = False if ( self.stop_if_not_converged and len(results) > 0 and n_errored == len(results) ): stop = True return Response(stop_children=stop, output=relax_doc)
[docs] @dataclass class AirssValidateMaker(Maker): """Validate that required AIRSS executables are installed.""" name: str = "airss validate" additional_exes: tuple = () required_exes: tuple = ("buildcell", "castep_relax", "castep2res")
[docs] @job def make(self) -> Optional[Response]: """Check that all required executables are on PATH.""" exes_to_check = self.additional_exes + self.required_exes not_found = [] for exe_name in exes_to_check: try: subprocess.run(["which", exe_name], check=True) except subprocess.CalledProcessError: not_found.append(exe_name) if not_found: logger.error( "AIRSS installation incomplete, executables not found: %s", not_found, ) return Response(stop_jobflow=True) return None