Skip to content

Commit

Permalink
ci: testcase_matrix parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiphereth-A committed Feb 9, 2025
1 parent 37af81b commit a5f862f
Show file tree
Hide file tree
Showing 783 changed files with 4,323 additions and 2,983 deletions.
5 changes: 4 additions & 1 deletion libs/classes/config_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@


class ConfigBase:
def __init__(self, conf_path: str):
def __init__(self, conf_path: str, readonly: bool = False):
self.__conf_path = conf_path
self._config: dict = {}
self._readonly = readonly
self.reload()

def __str__(self) -> str:
Expand All @@ -18,6 +19,8 @@ def reload(self):
self._config = yaml.safe_load(f)

def output(self):
if self._readonly:
raise AssertionError(f"{self.__conf_path} is readonly")
if not os.access(self.__conf_path, os.W_OK):
raise PermissionError(f"{self.__conf_path} is inaccessible")
with open(self.__conf_path, 'w', encoding='utf8') as f:
Expand Down
89 changes: 78 additions & 11 deletions libs/classes/config_tcgen.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,91 @@
import os


from copy import deepcopy
from libs.classes.config_base import ConfigBase
from libs.decorator import withlog
from multipledispatch import dispatch


class ConfigTCGen(ConfigBase):
def __init__(self, conf_path: str):
super().__init__(conf_path)
super().__init__(conf_path, readonly=True)

def __check_categories_priorities(self, categories: list[str]):
if not categories:
return
priorities = [self.get_priority(cat) for cat in categories]
if len(set(priorities)) != 1:
_ = RuntimeError('Priorities not in same')
_.add_note(f"categories: '{categories}'")
_.add_note(f"priorities: '{priorities}'")
raise _

def _get_keys_raw(self):
def _get_categories_raw(self) -> list[str]:
return self._config.keys()

def _get_content_raw(self, key: str):
return self._config[key]
def _get_priority_raw(self, category: str) -> int:
return self.items(category, 'priority')

def _get_categories_by_priority_raw(self, priority: int) -> list[str]:
return list(filter(lambda cat: self._get_priority_raw(cat) == priority, self._get_categories_raw()))

@dispatch(str)
def _get_memberlist_raw(self, category: str) -> list[tuple[str, str]]:
return [(category, mem) for mem in self.items(category, 'member').keys()]

@dispatch(list)
def _get_memberlist_raw(self, categories: list[str]) -> list[tuple[str, str]]:
self.__check_categories_priorities(categories)
result = []
for cat in categories:
result.extend(self._get_memberlist_raw(cat))
return result

@dispatch(str, str)
def _get_member_content_raw(self, category: str, member: str) -> dict:
content = deepcopy(self.items(category, 'default_content'))
member_content = deepcopy(self.items(category, 'member', member))
if not content:
content = deepcopy(member_content)
if not content:
raise RuntimeError(
f"member '{member}' is invalid in category '{category}'")

content.update(category_name=category)
content.update(member_name=member)
if member_content:
for (k, v) in member_content.items():
content[k] = v
return content

@dispatch(list, str)
def _get_member_content_raw(self, categories: list[str], member: str) -> dict:
for category in categories:
try:
return self._get_member_content_raw(category, member)
except:
pass
raise RuntimeError(
f"member '{member}' is invalid in categories '{categories}'")

@withlog
def get_categories(self, **kwargs) -> list[str]:
return self._get_categories_raw()

@withlog
def get_priority(self, category: str, **kwargs) -> int:
return self._get_priority_raw(category)

@withlog
def get_categories_by_priority(self, priority: int, **kwargs) -> int:
return self._get_categories_by_priority_raw(priority)

@withlog
def get_categories_with_same_priority(self, category: str, **kwargs) -> int:
return self._get_categories_by_priority_raw(self._get_priority_raw(category))

@withlog
def get_keys(self, **kwargs):
return self._get_keys_raw()
def get_memberlist(self, category: str | list[str], **kwargs) -> list[tuple[str, str]]:
return self._get_memberlist_raw(category)

@withlog
def get_content(self, key: str, **kwargs) -> str:
return self._get_content_raw(key)
def get_member_content(self, category: str, member: str, **kwargs) -> dict:
return self._get_member_content_raw(category, member)
4 changes: 2 additions & 2 deletions libs/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
CONTENTS_CS: str = os.path.join(CONTENTS_DIR, 'contents_cheatsheet.tex')
CONTENTS_NB: str = os.path.join(CONTENTS_DIR, 'contents_notebook.tex')

CONFIG: Config = Config(os.path.join('.', 'config.yml'))
CONFIG_TCGEN: ConfigTCGen = ConfigTCGen(os.path.join('.', 'tcgen.yml'))
CONFIG = Config(os.path.join('.', 'config.yml'))
CONFIG_TCGEN = ConfigTCGen(os.path.join('.', 'tcgen.yml'))

CLEAN_EXT_NAME: list[str] = ['.aux', '.bbl', '.blg', '.dvi', 'fdb_latexmk', '.fls',
'.log', '.nav', '.out', '.pyg', '.snm', '.synctex.gz', '.toc', '.vrb', '.xdv']
260 changes: 260 additions & 0 deletions libs/testcase_matrix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
from copy import deepcopy
import itertools
import os
from typing import Iterable

from libs.consts import CONFIG_TCGEN
from libs.decorator import withlog


class case_parser:
def __init__(self, case_list: Iterable[tuple[str, str]]):
self._case_list: list[tuple[str, str]] = sorted(case_list,
key=lambda x: CONFIG_TCGEN.get_priority(x[0]))
self._case_dict: list[dict] = [CONFIG_TCGEN.get_member_content(cat, mem)
for cat, mem in self._case_list]
self._requirement_members_list: list[list[str]] = []
for case in self._case_dict:
req, cat = deepcopy(case['requirements']), case['category_name']
req_members: list[str] = []
if req:
for case2 in self._case_dict:
cat2 = case2['category_name']
if cat2 == cat:
continue
for idx, single_req in enumerate(req):
if cat2 not in single_req:
continue
req_members.append(case2['member_name'])
req[idx].pop(single_req.index(cat2))
break
self._requirement_members_list.append(req_members)

def __str__(self):
return str(self._case_dict)

def _apply_format_specifiers(self, content: str, member: str, requirement_members: list[str]) -> str:
if not content:
return ''
for idx, mem in enumerate(requirement_members):
content = content.replace(f"$mb{idx}b", mem)
content = content.replace('$m', member)
return content

def _get_content_ignore(self, type: str) -> str:
cat_list: list[str] = []
result: str = ''
for case, requirements in zip(self._case_dict, self._requirement_members_list):
category_name = case['category_name']
if category_name in cat_list:
continue
cat_list.extend(
CONFIG_TCGEN.get_categories_with_same_priority(category_name))
result += self._apply_format_specifiers(
case[type], case['member_name'], requirements)
return result

@withlog
def get_include_list(self, **kwargs) -> list[str]:
result: list[str] = []
for case, req in zip(self._case_dict, self._requirement_members_list):
if not case['include']:
continue
result.extend([self._apply_format_specifiers(
i, case['member_name'], req) for i in case['include']])
return sorted(list(set(result)))

@withlog
def get_content_after_include(self, **kwargs) -> str:
return self._get_content_ignore('after_include')

@withlog
def get_content_main_begin(self, **kwargs) -> str:
return self._get_content_ignore('main_begin')

@withlog
def get_label(self, **kwargs) -> str:
return '.'.join(f"{cat}-{mem}" for cat, mem in self._case_list)


class testcase_matrix:
def __init__(self):
self._cat_list: list[list[str]] = []
self._all_cases: list[case_parser] = []

def _get_member_list(self) -> list[list[tuple[str, str]]]:
categories = sorted(self._cat_list,
key=lambda x: CONFIG_TCGEN.get_priority(x[0]))
result: list = []
for cat in categories:
result.append(CONFIG_TCGEN.get_memberlist(cat))
return result

@withlog
def append(self, categories: Iterable[str], **kwargs):
self._cat_list.append(categories)
return self

@withlog
def make_all_cases(self, **kwargs):
def _single_iteration(all_cases, all_cases_new):
result: list[list[tuple[str, str]]] = []
result_new: list[list[tuple[str, str]]] = []
for now_case, now_case_new in zip(all_cases, all_cases_new):
required_cats: list[tuple[str, ...]] = []
for cat, mem in now_case_new:
req: list[list[str]] | None = CONFIG_TCGEN.get_member_content(cat, mem)[
'requirements']
if req:
required_cats.extend(itertools.product(*req))
if not required_cats:
result.append(now_case)
result_new.append([])
continue
for cats in required_cats:
ls = [CONFIG_TCGEN.get_memberlist(cat) for cat in cats]
prod = list(itertools.product(*ls))
result.extend([list(now_case) + list(req)
for req in prod])
result_new.extend([list(req) for req in prod])
return result, result_new

all_cases = [list(i)
for i in itertools.product(*self._get_member_list())]
all_cases_new = deepcopy(all_cases)
kwargs.get('logger').debug(f'Initial:')
kwargs.get('logger').debug(' '*2+f'all_cases:')
kwargs.get('logger').debug('\n'.join(str(i)
for i in all_cases))
kwargs.get('logger').debug(' '*2+f'all_cases_new:')
kwargs.get('logger').debug('\n'.join(str(i)
for i in all_cases_new))
iter_limit = len(CONFIG_TCGEN.get_categories())
cnt = 0
while True:
all_cases, all_cases_new = _single_iteration(
all_cases, all_cases_new)
kwargs.get('logger').debug(f'Iteration {cnt}:')
kwargs.get('logger').debug(' '*2+f'all_cases:')
kwargs.get('logger').debug('\n'.join(str(i)
for i in all_cases))
if not sum([len(i) for i in all_cases_new]):
break
cnt += 1
if cnt > iter_limit:
raise RuntimeError('Max iteration limit exceed!')
self._all_cases = [deepcopy(case_parser(now_case))
for now_case in all_cases]

@withlog
def get_all_cases(self, **kwargs):
if not self._all_cases:
self.make_all_cases()
return self._all_cases

@withlog
def exclude(self, case: list[tuple[str, str]], **kwargs):
if not self._all_cases:
self.make_all_cases()
case_list = [i._case_list for i in self._all_cases]
self._all_cases.pop(case_list.index(
sorted(case, key=lambda x: CONFIG_TCGEN.get_priority(x[0]))))
return self


_GENTC_BEGIN = '// ---<GENTC>--- begin\n'
_GENTC_END = '// ---<GENTC>--- end\n'
_GENTC_MAIN = 'int main() {\n'
_GENTC_COMMAND_APPEND = '// ---<GENTC>--- append '
_GENTC_COMMAND_EXCLIDE = '// ---<GENTC>--- exclude '


class cppmeta_parser:
def __init__(self, cppmeta_filename_noext: str, target_dir: str, cppmeta_code_lines: list[str]):
self._filename_noext = cppmeta_filename_noext
self._target_dir = target_dir
self._code_lines = cppmeta_code_lines
self._testcase_mat = testcase_matrix()

def _get_all_cases(self):
block_begin, block_end = -1, -1
inblock, appended, excluded = False, False, False
main_index = self._code_lines.index(_GENTC_MAIN)

for index, codeline in enumerate(self._code_lines):
if codeline == _GENTC_BEGIN:
if inblock:
raise RuntimeError(
f'Parse error: `GENTC begin` can not appear in a GENTC block')
block_begin = index
inblock = True
continue

if codeline == _GENTC_END:
if not inblock:
raise RuntimeError(f'Parse error: `GENTC end` mismatched')
block_end = index+1
inblock = False
break
if not inblock:
continue

if codeline.startswith(_GENTC_COMMAND_APPEND):
if excluded:
raise RuntimeError(
'Parse error: `GENTC append` found after `GENTC exclude`')
appended = True
self._testcase_mat.append(list(filter(
lambda x: x, codeline.removeprefix(_GENTC_COMMAND_APPEND).strip().split())))
continue
if codeline.startswith(_GENTC_COMMAND_EXCLIDE):
if not appended:
raise RuntimeError(
'Parse error: `GENTC exclude` found before `GENTC append`')
excluded = True
self._testcase_mat.exclude([tuple(x.split('-')) for x in filter(
lambda x: x, codeline.removeprefix(_GENTC_COMMAND_EXCLIDE).strip().split('.'))])
continue

raise RuntimeError(
f"Parse error: unknown GENTC command '{codeline.rstrip()}'")

if block_begin < 0:
raise RuntimeError('Parse error: GENTC block not found')
if inblock or block_end < 0:
raise RuntimeError('Parse error: `GENTC begin` mismatched')
if not (block_begin < block_end <= main_index):
raise RuntimeError('GENTC block must be before main function')
return block_begin, block_end, main_index, self._testcase_mat.get_all_cases()

def _get_include_relpath(self, include_filepath: str) -> str:
return os.path.relpath(include_filepath, self._target_dir).replace("\\", '/')

def _get_all_target_content(self) -> list[tuple[str, list[str]]]:
block_begin, block_end, main_index, all_cases = self._get_all_cases()
result = []
for case in all_cases:
target_filepath = os.path.join(
self._target_dir, f"{self._filename_noext}.{case.get_label()}.test.cpp")
now_codelines: list[str] = ['#define AUTO_GENERATED\n']
# before include
now_codelines += self._code_lines[0:block_begin]
# include
now_codelines += [f'#include "{self._get_include_relpath(include)}"\n'
for include in case.get_include_list()]
# after include
now_codelines += ['\n'] + \
case.get_content_after_include().splitlines(True)
# before main
now_codelines += self._code_lines[block_end:main_index+1]
# main begin
now_codelines += [i if i.startswith(' ') else ' '+i
for i in case.get_content_main_begin().splitlines(True)]
# remains
now_codelines += self._code_lines[main_index+1:]
result.append((target_filepath, now_codelines))
return result

@withlog
def get_results(self, **kwargs) -> list[tuple[str, list[str]]]:
return self._get_all_target_content()
Loading

0 comments on commit a5f862f

Please sign in to comment.