Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions interfaces/ASE_interface/abacuslite/io/latestio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

import numpy as np
from ase.atoms import Atoms
Expand All @@ -22,6 +22,7 @@
read_traj_from_md_dump,
read_magmom_from_running_log,
is_invalid_arr,
find_final_info_with_iter_header
)

def read_esolver_type_from_running_log(src: str | Path | List[str]) \
Expand Down Expand Up @@ -359,6 +360,30 @@ def read_stress_from_running_log(src: str | Path | List[str]) \

return stresses

def read_iter_header_from_running_log(src: str | Path | List[str]) \
-> List[Tuple[int, int]]:
'''
read the "iteration header" from the running log, useful for determining
which is the final iteration. The "iteration header" is defined as:
```
--> #ION MOVE# 1 #ELEC ITER# 3
```
'''
if isinstance(src, (str, Path)):
with open(src) as f:
raw = f.readlines()
else: # assume the src is the return of the readlines()
raw = src
# with open(fn) as f:
# raw = f.readlines()
raw = [l.strip() for l in raw]
raw = [l for l in raw if l] # remove empty lines

HEADER_PAT = r'-->\s+#ION MOVE#\s+(\d+)\s+#ELEC ITER#\s+(\d+)'
res = [re.findall(HEADER_PAT, l) for l in raw]

return [(int(pack[0][0]), int(pack[0][1])) for pack in res if pack]

def read_abacus_out(fileobj,
index=slice(None),
results_required=True,
Expand Down Expand Up @@ -439,9 +464,11 @@ def read_abacus_out(fileobj,
# the simulation, which is, not exactly to be true for the NPT-MD
# runs.

_, energies = read_energies_from_running_log(abacus_lines)
# only keep the SCF converged energies
energies = [edct for edct in energies if 'E_KS(sigma->0)' in edct]
# only keep the energies of the final iteration for each ion step
energies = find_final_info_with_iter_header(
read_energies_from_running_log(abacus_lines)[1],
read_iter_header_from_running_log(abacus_lines)
)

# read the magmom
magmom = read_magmom_from_running_log(abacus_lines)
Expand Down Expand Up @@ -639,5 +666,12 @@ def test_read_pw_symm0_nspin4_gamma_md(self):
# (self.testfiles / 'eig_occ.txt').unlink()
(self.testfiles / 'MD_dump').unlink()

def test_read_iter_header_from_running_log(self):
header = read_iter_header_from_running_log(
self.testfiles / 'lcao-symm0-nspin2-multik-cellrelax')
self.assertTupleEqual(header[0], (1, 1))
self.assertTupleEqual(header[1], (1, 2))
self.assertTupleEqual(header[2], (2, 1))

if __name__ == '__main__':
unittest.main()
87 changes: 83 additions & 4 deletions interfaces/ASE_interface/abacuslite/io/legacyio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest
from io import TextIOWrapper
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Tuple, Optional, Any

import numpy as np
from ase.atoms import Atoms
Expand Down Expand Up @@ -706,6 +706,63 @@ def read_magmom_from_running_log(src: str | Path | List[str]) \

return magmom

def read_iter_header_from_running_log(src: str | Path | List[str]) \
-> List[Tuple[int, int]]:
'''
read the "iteration header" from the running log, useful for determining
which is the final iteration. The "iteration header" is defined as:
```
LCAO ALGORITHM --------------- ION= 1 ELEC= 100---------------------
```
or for PW:
```
PW ALGORITHM --------------- ION=2 ELEC=1 -----------------------
```
'''
if isinstance(src, (str, Path)):
with open(src) as f:
raw = f.readlines()
else: # assume the src is the return of the readlines()
raw = src
# with open(fn) as f:
# raw = f.readlines()
raw = [l.strip() for l in raw]
raw = [l for l in raw if l] # remove empty lines

HEADER_PAT = r'(LCAO|PW)\s+ALGORITHM\s+-+\s+ION=\s+([0-9]+)\s+ELEC=\s+([0-9]+)'
res = [re.findall(HEADER_PAT, l) for l in raw]

return [(int(pack[0][1]), int(pack[0][2])) for pack in res if pack]

def find_final_info_with_iter_header(
info: List[Any],
headers: List[Tuple[int, int]]
) -> List[Any]:
'''find the energies of the final iteration

Parameters
----------
info: List[Dict[str, float]]
The information that is printed after each iteration.
headers: List[Tuple[int, int]]
The "iteration header" returned from the function
`read_iter_header_from_running_log`

Returns
-------
List[Any]
The information of the final iteration.
'''
headers = np.array(headers) # because indices should start from 0
assert headers.ndim == 2
assert headers.shape[1] == 2

ion = headers[:, 0].flatten()
ion, nelec = np.unique(ion, return_counts=True)
ielec = np.cumsum(nelec) - 1

return [info[i] for i in ielec]

def is_invalid_arr(arr) -> bool:
'''Check if the array is invalid, including the cases of None,
empty array, and array with NaN values.
Expand Down Expand Up @@ -797,9 +854,11 @@ def read_abacus_out(fileobj,
# the simulation, which is, not exactly to be true for the NPT-MD
# runs.

_, energies = read_energies_from_running_log(abacus_lines)
# only keep the SCF converged energies
energies = [edct for edct in energies if 'E_KS(sigma->0)' in edct]
# only keep the energies of the final iteration for each ion step
energies = find_final_info_with_iter_header(
read_energies_from_running_log(abacus_lines)[1],
read_iter_header_from_running_log(abacus_lines)
)

# read the magmom
magmom = read_magmom_from_running_log(abacus_lines)
Expand Down Expand Up @@ -1107,5 +1166,25 @@ def test_read_magmom_from_running_log(self):
np.array([[0. , 0. , 3.62032142],
[0. , 0. , 3.62032142]])))

def test_read_iter_header_from_running_log(self):
fn = self.testfiles / 'lcao-symm0-nspin2-multik-cellrelax_'
header = read_iter_header_from_running_log(fn)
self.assertEqual(header[0], (1,1))
self.assertEqual(header[1], (1,2))
self.assertEqual(header[2], (2,1))
fn = self.testfiles / 'pw-symm0-nspin4-gamma-md_'
header = read_iter_header_from_running_log(fn)
self.assertEqual(header[0], (1,1))
self.assertEqual(header[1], (1,2))
self.assertEqual(header[2], (1,3))
self.assertEqual(header[3], (3,2))
self.assertEqual(header[4], (3,3))

def test_find_final_info_with_iter_header(self):
info = [1, 2, 3, 4, 5, 6]
header = [[1,1], [1,2], [2,1], [3,1], [3,2], [3,3]]
final_info = find_final_info_with_iter_header(info, header)
self.assertListEqual(final_info, [info[1], info[2], info[5]])

if __name__ == '__main__':
unittest.main()
Loading