diff --git a/test.py b/test.py
index 195ddaa..08f89c7 100755
--- a/test.py
+++ b/test.py
@@ -29,12 +29,11 @@
import shutil
import subprocess
import threading
-from glob import glob
-from dataclasses import dataclass
+from collections.abc import Callable
from xml.sax.saxutils import escape as xmlescape, quoteattr as xmlquoteattr
from pathlib import Path
+from functools import partial
from concurrent.futures import ThreadPoolExecutor, as_completed
-from typing import List, Optional, Tuple
from http.server import HTTPServer, SimpleHTTPRequestHandler
@@ -43,46 +42,56 @@
YELLOW = "\033[33m"
RESET = "\033[0m"
+COMPILER_NAME = "c_compiler"
+
if not sys.stdout.isatty():
# Don't output colours when we're not in a TTY
RED, GREEN, YELLOW, RESET = "", "", "", ""
-@dataclass
class Result:
"""Class for keeping track of each test case result"""
- test_case_name: str
- return_code: int
- timeout: bool
- error_log: Optional[str]
+
+ def __init__(self, test_case_name: str, return_code: int, timeout: bool, error_log: str | None):
+ self._test_case_name = test_case_name
+ self._return_code = return_code
+ self._timeout = timeout
+ self._error_log = error_log
+ self._timeout = "[TIMED OUT] " if self._timeout else ""
def passed(self) -> bool:
- return self.return_code == 0
+ return self._return_code == 0
def to_xml(self) -> str:
if self.passed():
- system_out = f"{self.error_log}\n" if self.error_log else ""
+ system_out = f"{self._error_log}\n" if self._error_log else ""
return (
- f"\n"
+ f"\n"
f"{system_out}"
f"\n"
)
- timeout = "[TIMED OUT] " if self.timeout else ""
- attribute = xmlquoteattr(timeout + self.error_log)
- xml_tag_body = xmlescape(timeout + self.error_log)
+ attribute = xmlquoteattr(self._timeout + self._error_log)
+ xml_tag_body = xmlescape(self._timeout + self._error_log)
return (
- f"\n"
+ f"\n"
f"\n{xml_tag_body}\n"
f"\n"
)
def to_log(self) -> str:
- timeout = "[TIMED OUT] " if self.timeout else ""
- if self.return_code != 0:
- return f"{self.test_case_name}\n{RED}{timeout + self.error_log}{RESET}\n"
- if self.error_log is None:
- return f"{self.test_case_name}\n\t> {GREEN}Pass{RESET}\n"
- return f"{self.test_case_name}\n\t> {YELLOW}{self.error_log}{RESET}\n"
+ if self._return_code != 0:
+ msg = f"{RED}{self._timeout + self._error_log}"
+ elif self._error_log is None:
+ msg = f"{GREEN}Pass"
+ else:
+ msg = f"{YELLOW}{self._error_log}"
+
+ return f"{self._test_case_name}\n\t{msg}{RESET}\n"
+
+class TestFailed(Exception):
+ def __init__(self, result: Result):
+ self.result = result
+ super().__init__(str(result._error_log))
class JUnitXMLFile():
def __init__(self, path: Path):
@@ -169,17 +178,19 @@ def update_with_value(self, passed: bool):
with self._lock:
self.update()
+type subprocess_status = tuple[int, str, bool]
+
def run_subprocess(
- cmd: List[str],
+ cmd: list[str],
timeout: int,
- env: Optional[dict] = None,
- log_path: Optional[str] = None,
+ env: dict | None = None,
+ log_path: str | None = None,
verbose: bool = True,
-) -> tuple[int, str, bool]:
+) -> subprocess_status:
"""
Wrapper for subprocess.run(...) with common arguments and error handling.
- Returns tuple of (return_code: int, error_message: str, timed_out: bool)
+ Returns a tuple of (return_code: int, error_message: str, timed_out: bool)
"""
timeout_returncode = 124
@@ -220,7 +231,7 @@ def clean(top_dir: Path, timeout: int = 15) -> bool:
return False
return True
-def make(top_dir: Path, build_dir: Path, multithreading: int, verbose: bool, log_path: Optional[str] = None, timeout: int = 60) -> bool:
+def make(top_dir: Path, build_dir: Path, multithreading: int, verbose: bool, log_path: str | None = None, timeout: int = 60) -> bool:
"""
Wrapper for make build/c_compiler.
@@ -233,7 +244,7 @@ def make(top_dir: Path, build_dir: Path, multithreading: int, verbose: bool, log
cmd = ["make", "-C", str(top_dir)]
if multithreading > 1:
cmd += ["-j", str(multithreading)]
- cmd += [f"{build_dir.name}/c_compiler"]
+ cmd += [f"{build_dir.name}/{COMPILER_NAME}"]
return_code, error_msg, _ = run_subprocess(
cmd=cmd, timeout=timeout, verbose=verbose, env=custom_env, log_path=log_path
@@ -287,6 +298,8 @@ def build(
):
"""
Wrapper for building the student compiler. Assumes output folder exists.
+
+ Return True if successful, False otherwise
"""
# Prepare the build folder
build_dir = top_dir / "build"
@@ -351,19 +364,15 @@ def process_result(
if verbose:
print(result.to_log())
- return
- if progress_bar:
+ elif progress_bar:
progress_bar.update_with_value(result.passed())
- return
-
def run_test(
- build_dir: Path,
+ compiler: Callable[[Path, Path, int], subprocess_status],
output_dir: Path,
tests_dir: Path,
driver: Path,
- validate_tests: bool = False,
timeout: int = 30
) -> Result:
"""
@@ -392,89 +401,89 @@ def run_test(
shutil.rmtree(log_path.parent, ignore_errors=True)
log_path.parent.mkdir(parents=True, exist_ok=True)
- def relevant_files(component):
- return f"\n\t {log_path}.{component}.stderr.log \n\t {log_path}.{component}.stdout.log"
-
- # Modifying environment to combat errors on memory leak
- custom_env = os.environ.copy()
- custom_env["ASAN_OPTIONS"] = f"log_path={log_path}.asan.log"
- custom_env["UBSAN_OPTIONS"] = f"log_path={log_path}.ubsan.log"
+ def get_relevant_files(component: str):
+ return "\n".join(f"\t{log_path}.{component}.{suffix}" for suffix in ["stderr.log", "stdout.log"])
+
+ sanitizer_file_list = list(log_path.parent.glob(".*san.log.*"))
+ compiler_log_file_str = "\n".join([
+ get_relevant_files("compiler"),
+ f"\t{log_path}.s",
+ f"\t{log_path}.s.printed",
+ *(f"\t{p}" for p in sanitizer_file_list),
+ ])
+
+ def get_msg(component: str):
+ msg = f"{component.capitalize()} failed:\n{compiler_log_file_str}"
+ if component != "compiler":
+ msg += f"\n{get_relevant_files(component)}"
+ return msg
+
+ def fail(component: str, return_code: int, timed_out: bool):
+ raise TestFailed(Result(
+ test_case_name=test_name,
+ return_code=return_code,
+ timeout=timed_out,
+ error_log=get_msg(component),
+ ))
+
+ def run_component(component: str, cmd: list[str]):
+ return_code, _, timed_out = run_subprocess(
+ cmd=cmd,
+ timeout=timeout,
+ log_path=f"{log_path}.{component}",
+ )
+ if return_code != 0:
+ fail(component, return_code, timed_out)
- # Compile the test case into assembly using the custom compiler or GCC for self validation
- if validate_tests:
- compile_cmd = [gcc, "-std=c90", "-pedantic-errors", "-ansi", "-O0", gcc_arch, gcc_abi, "-S", to_assemble, "-o", f"{log_path}.s"]
- else:
- compile_cmd = [build_dir / "c_compiler", "-S", to_assemble, "-o", f"{log_path}.s"]
+ try:
+ # GCC Reference Output
+ run_component(
+ component="reference",
+ cmd=[gcc, "-std=c90", "-pedantic", "-ansi", "-O0", gcc_arch, gcc_abi, "-S", to_assemble, "-o", f"{log_path}.gcc.s"]
+ )
- # Compile
- return_code, _, timed_out = run_subprocess(
- cmd=compile_cmd,
- timeout=timeout,
- env=custom_env,
- log_path=f"{log_path}.compiler",
- )
- sanitizer_file_list = glob(f"{log_path}.*san.log.*")
- compiler_log_file_str = f"{relevant_files('compiler')} \n\t {log_path}.s \n\t {log_path}.s.printed" \
- + "".join("\n\t " + p for p in sanitizer_file_list)
- if return_code != 0:
- msg = f"\t> Failed to compile testcase: {compiler_log_file_str}"
- return Result(test_case_name=test_name, return_code=return_code, timeout=timed_out, error_log=msg)
+ # Compile
+ return_code, _, timed_out = compiler(to_assemble, log_path, timeout)
+ if return_code != 0:
+ fail("compiler", return_code, timed_out)
- # GCC Reference Output
- return_code, _, timed_out = run_subprocess(
- cmd=[gcc, "-std=c90", "-pedantic", "-ansi", "-O0", gcc_arch, gcc_abi, "-S", to_assemble, "-o", f"{log_path}.gcc.s"],
- timeout=timeout,
- log_path=f"{log_path}.reference",
- )
- if return_code != 0:
- msg = f"\t> Failed to generate reference: {compiler_log_file_str} {relevant_files('reference')}"
- return Result(test_case_name=test_name, return_code=return_code, timeout=timed_out, error_log=msg)
+ # Assemble
+ run_component(
+ component="assembler",
+ cmd=[gcc, gcc_arch, gcc_abi, "-c", f"{log_path}.s", "-o", f"{log_path}.o"]
+ )
- # Assemble
- return_code, _, timed_out = run_subprocess(
- cmd=[gcc, gcc_arch, gcc_abi, "-c", f"{log_path}.s", "-o", f"{log_path}.o"],
- timeout=timeout,
- log_path=f"{log_path}.assembler",
- )
- if return_code != 0:
- msg = f"\t> Failed to assemble: {compiler_log_file_str} {relevant_files('assembler')}"
- return Result(test_case_name=test_name, return_code=return_code, timeout=timed_out, error_log=msg)
+ # Link
+ run_component(
+ component="linker",
+ cmd=[gcc, gcc_arch, gcc_abi, "-static", f"{log_path}.o", str(driver), "-o", f"{log_path}"]
+ )
- # Link
- return_code, _, timed_out = run_subprocess(
- cmd=[gcc, gcc_arch, gcc_abi, "-static", f"{log_path}.o", str(driver), "-o", f"{log_path}"],
- timeout=timeout,
- log_path=f"{log_path}.linker",
- )
- if return_code != 0:
- msg = f"\t> Failed to link driver: {compiler_log_file_str} {relevant_files('linker')}"
- return Result(test_case_name=test_name, return_code=return_code, timeout=timed_out, error_log=msg)
+ # Simulate
+ run_component(
+ component="simulation",
+ cmd=["spike", "--isa=rv32gc", "pk", log_path]
+ )
- # Simulate
- return_code, _, timed_out = run_subprocess(
- cmd=["spike", "--isa=rv32gc", "pk", log_path],
- timeout=timeout,
- log_path=f"{log_path}.simulation",
- )
- if return_code != 0:
- msg = f"\t> Failed to simulate: {compiler_log_file_str} {relevant_files('simulation')}"
- return Result(test_case_name=test_name, return_code=return_code, timeout=timed_out, error_log=msg)
+ except TestFailed as e:
+ return e.result
- msg = "Sanitizer warnings: " + " ".join(sanitizer_file_list) if len(sanitizer_file_list) != 0 else None
- return Result(test_case_name=test_name, return_code=return_code, timeout=False, error_log=msg)
+ msg = f"Sanitizer warnings: {" ".join(sanitizer_file_list)}" if len(sanitizer_file_list) != 0 else None
+ return Result(test_case_name=test_name, return_code=0, timeout=False, error_log=msg)
def run_tests(
- build_dir: Path,
+ compiler: Callable[[Path, Path, int], subprocess_status],
output_dir: Path,
tests_dir: Path,
xml_file: JUnitXMLFile,
multithreading: int,
verbose: bool,
- validate_tests: bool = False,
timeout: int = 30
-) -> Tuple[int, int]:
+) -> tuple[int, int]:
"""
Runs tests against compiler.
+
+ Returns a tuple of (passing: int, total: int) tests
"""
drivers = list(tests_dir.rglob("*_driver.c"))
drivers = sorted(drivers, key=lambda p: (p.parent.name, p.name))
@@ -491,11 +500,10 @@ def run_tests(
with ThreadPoolExecutor(max_workers=multithreading) as executor:
futures = [executor.submit(
run_test,
- build_dir=build_dir,
+ compiler=compiler,
output_dir=output_dir,
tests_dir=tests_dir,
driver=driver,
- validate_tests=validate_tests,
timeout=timeout
) for driver in drivers]
@@ -507,11 +515,10 @@ def run_tests(
else:
for driver in drivers:
result = run_test(
- build_dir=build_dir,
+ compiler=compiler,
output_dir=output_dir,
tests_dir=tests_dir,
driver=driver,
- validate_tests=validate_tests,
timeout=timeout
)
results.append(result.passed())
@@ -525,6 +532,29 @@ def run_tests(
return passing, total
+def student_compiler(compiler_path: Path, to_assemble: Path, log_path: Path, timeout: int) -> subprocess_status:
+ """
+ Wrapper for `build/c_compiler -S -o .s`.
+
+ Return None if successful, a Result otherwise
+ """
+ # Modifying environment to combat errors on memory leak
+ custom_env = os.environ.copy()
+ custom_env["ASAN_OPTIONS"] = f"log_path={log_path}.asan.log"
+ custom_env["UBSAN_OPTIONS"] = f"log_path={log_path}.ubsan.log"
+
+ # Compile
+ return run_subprocess(
+ cmd=[compiler_path, "-S", to_assemble, "-o", f"{log_path}.s"],
+ timeout=timeout,
+ env=custom_env,
+ log_path=f"{log_path}.compiler",
+ )
+
+def symlink_reference_compiler(to_assemble: Path, log_path: Path, timeout: int) -> subprocess_status:
+ Path(f"{log_path}.s").symlink_to(f"{log_path}.gcc.s")
+ return 0, "", False
+
def parse_args(tests_dir: Path) -> argparse.Namespace:
"""
Wrapper for argument parsing.
@@ -538,10 +568,11 @@ def parse_args(tests_dir: Path) -> argparse.Namespace:
help="(Optional) paths to the compiler test folders. Use this to select "
"certain tests. Leave blank to run all tests."
)
+ CPUs = os.cpu_count()
parser.add_argument(
"-m", "--multithreading",
nargs="?",
- const=8,
+ const=8 if CPUs is None else CPUs,
default=1,
type=int,
metavar="N",
@@ -624,13 +655,13 @@ def main():
# Run the tests and save the results into JUnit XML file
with JUnitXMLFile(build_dir / "junit_results.xml") as xml_file:
passing, total = run_tests(
- build_dir=build_dir,
+ compiler=symlink_reference_compiler if args.validate_tests \
+ else partial(student_compiler, build_dir / COMPILER_NAME),
output_dir=output_dir,
tests_dir=Path(args.dir),
xml_file=xml_file,
multithreading=args.multithreading,
- verbose=not args.silent,
- validate_tests=args.validate_tests
+ verbose=not args.silent
)
# Skip unavailable coverage and exit immediately for test validation