diff --git a/interfaces/ASE_interface/abacuslite/io/latestio.py b/interfaces/ASE_interface/abacuslite/io/latestio.py index cba0fce201..b658c0dc74 100644 --- a/interfaces/ASE_interface/abacuslite/io/latestio.py +++ b/interfaces/ASE_interface/abacuslite/io/latestio.py @@ -3,7 +3,7 @@ import shutil import unittest from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple import numpy as np from ase.atoms import Atoms @@ -22,6 +22,7 @@ read_traj_from_md_dump, read_magmom_from_running_log, is_invalid_arr, + find_final_info_with_iter_header ) def read_esolver_type_from_running_log(src: str | Path | List[str]) \ @@ -359,6 +360,30 @@ def read_stress_from_running_log(src: str | Path | List[str]) \ return stresses +def read_iter_header_from_running_log(src: str | Path | List[str]) \ + -> List[Tuple[int, int]]: + ''' + read the "iteration header" from the running log, useful for determining + which is the final iteration. The "iteration header" is defined as: + ``` + --> #ION MOVE# 1 #ELEC ITER# 3 + ``` + ''' + if isinstance(src, (str, Path)): + with open(src) as f: + raw = f.readlines() + else: # assume the src is the return of the readlines() + raw = src + # with open(fn) as f: + # raw = f.readlines() + raw = [l.strip() for l in raw] + raw = [l for l in raw if l] # remove empty lines + + HEADER_PAT = r'-->\s+#ION MOVE#\s+(\d+)\s+#ELEC ITER#\s+(\d+)' + res = [re.findall(HEADER_PAT, l) for l in raw] + + return [(int(pack[0][0]), int(pack[0][1])) for pack in res if pack] + def read_abacus_out(fileobj, index=slice(None), results_required=True, @@ -439,9 +464,11 @@ def read_abacus_out(fileobj, # the simulation, which is, not exactly to be true for the NPT-MD # runs. - _, energies = read_energies_from_running_log(abacus_lines) - # only keep the SCF converged energies - energies = [edct for edct in energies if 'E_KS(sigma->0)' in edct] + # only keep the energies of the final iteration for each ion step + energies = find_final_info_with_iter_header( + read_energies_from_running_log(abacus_lines)[1], + read_iter_header_from_running_log(abacus_lines) + ) # read the magmom magmom = read_magmom_from_running_log(abacus_lines) @@ -639,5 +666,12 @@ def test_read_pw_symm0_nspin4_gamma_md(self): # (self.testfiles / 'eig_occ.txt').unlink() (self.testfiles / 'MD_dump').unlink() + def test_read_iter_header_from_running_log(self): + header = read_iter_header_from_running_log( + self.testfiles / 'lcao-symm0-nspin2-multik-cellrelax') + self.assertTupleEqual(header[0], (1, 1)) + self.assertTupleEqual(header[1], (1, 2)) + self.assertTupleEqual(header[2], (2, 1)) + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/interfaces/ASE_interface/abacuslite/io/legacyio.py b/interfaces/ASE_interface/abacuslite/io/legacyio.py index 252ff3e3f4..2fb3640b17 100644 --- a/interfaces/ASE_interface/abacuslite/io/legacyio.py +++ b/interfaces/ASE_interface/abacuslite/io/legacyio.py @@ -3,7 +3,7 @@ import unittest from io import TextIOWrapper from pathlib import Path -from typing import Dict, List, Tuple, Optional +from typing import Dict, List, Tuple, Optional, Any import numpy as np from ase.atoms import Atoms @@ -706,6 +706,63 @@ def read_magmom_from_running_log(src: str | Path | List[str]) \ return magmom +def read_iter_header_from_running_log(src: str | Path | List[str]) \ + -> List[Tuple[int, int]]: + ''' + read the "iteration header" from the running log, useful for determining + which is the final iteration. The "iteration header" is defined as: + ``` + LCAO ALGORITHM --------------- ION= 1 ELEC= 100--------------------- + ``` + or for PW: + ``` + PW ALGORITHM --------------- ION=2 ELEC=1 ----------------------- + ``` + ''' + if isinstance(src, (str, Path)): + with open(src) as f: + raw = f.readlines() + else: # assume the src is the return of the readlines() + raw = src + # with open(fn) as f: + # raw = f.readlines() + raw = [l.strip() for l in raw] + raw = [l for l in raw if l] # remove empty lines + + HEADER_PAT = r'(LCAO|PW)\s+ALGORITHM\s+-+\s+ION=\s+([0-9]+)\s+ELEC=\s+([0-9]+)' + res = [re.findall(HEADER_PAT, l) for l in raw] + + return [(int(pack[0][1]), int(pack[0][2])) for pack in res if pack] + +def find_final_info_with_iter_header( + info: List[Any], + headers: List[Tuple[int, int]] + ) -> List[Any]: + '''find the energies of the final iteration + + Parameters + ---------- + info: List[Dict[str, float]] + The information that is printed after each iteration. + headers: List[Tuple[int, int]] + The "iteration header" returned from the function + `read_iter_header_from_running_log` + + Returns + ------- + List[Any] + The information of the final iteration. + ''' + headers = np.array(headers) # because indices should start from 0 + assert headers.ndim == 2 + assert headers.shape[1] == 2 + + ion = headers[:, 0].flatten() + ion, nelec = np.unique(ion, return_counts=True) + ielec = np.cumsum(nelec) - 1 + + return [info[i] for i in ielec] + def is_invalid_arr(arr) -> bool: '''Check if the array is invalid, including the cases of None, empty array, and array with NaN values. @@ -797,9 +854,11 @@ def read_abacus_out(fileobj, # the simulation, which is, not exactly to be true for the NPT-MD # runs. - _, energies = read_energies_from_running_log(abacus_lines) - # only keep the SCF converged energies - energies = [edct for edct in energies if 'E_KS(sigma->0)' in edct] + # only keep the energies of the final iteration for each ion step + energies = find_final_info_with_iter_header( + read_energies_from_running_log(abacus_lines)[1], + read_iter_header_from_running_log(abacus_lines) + ) # read the magmom magmom = read_magmom_from_running_log(abacus_lines) @@ -1107,5 +1166,25 @@ def test_read_magmom_from_running_log(self): np.array([[0. , 0. , 3.62032142], [0. , 0. , 3.62032142]]))) + def test_read_iter_header_from_running_log(self): + fn = self.testfiles / 'lcao-symm0-nspin2-multik-cellrelax_' + header = read_iter_header_from_running_log(fn) + self.assertEqual(header[0], (1,1)) + self.assertEqual(header[1], (1,2)) + self.assertEqual(header[2], (2,1)) + fn = self.testfiles / 'pw-symm0-nspin4-gamma-md_' + header = read_iter_header_from_running_log(fn) + self.assertEqual(header[0], (1,1)) + self.assertEqual(header[1], (1,2)) + self.assertEqual(header[2], (1,3)) + self.assertEqual(header[3], (3,2)) + self.assertEqual(header[4], (3,3)) + + def test_find_final_info_with_iter_header(self): + info = [1, 2, 3, 4, 5, 6] + header = [[1,1], [1,2], [2,1], [3,1], [3,2], [3,3]] + final_info = find_final_info_with_iter_header(info, header) + self.assertListEqual(final_info, [info[1], info[2], info[5]]) + if __name__ == '__main__': unittest.main()