From 95b2b134e695e4c7242aacd6c8644af4e9cf14ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20K=C3=A1kona?= Date: Sat, 18 Oct 2025 00:28:43 +0200 Subject: [PATCH] Generalize parser data-driven tests --- .github/workflows/tests.yml | 22 ++++ data/.gitkeep | 0 dosview/__init__.py | 223 ++--------------------------------- dosview/parsers.py | 227 ++++++++++++++++++++++++++++++++++++ tests/test_parser.py | 69 +++++++++++ 5 files changed, 326 insertions(+), 215 deletions(-) create mode 100644 .github/workflows/tests.yml create mode 100644 data/.gitkeep create mode 100644 dosview/parsers.py create mode 100644 tests/test_parser.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..4f84785 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,22 @@ +name: Python tests + +on: + push: + pull_request: + +jobs: + pytest: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install . pytest + - name: Run pytest + run: pytest diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/dosview/__init__.py b/dosview/__init__.py index fd28428..00cc2d7 100644 --- a/dosview/__init__.py +++ b/dosview/__init__.py @@ -11,10 +11,9 @@ import pyqtgraph as pg import pandas as pd -from PyQt5.QtWidgets import QSplitter import datetime -import time +import time from PyQt5.QtCore import * from PyQt5.QtGui import * @@ -27,219 +26,13 @@ from .version import __version__ from pyqtgraph import ImageView - - -import sys -import argparse - -from PyQt5 import QtNetwork -from PyQt5.QtNetwork import QLocalSocket, QLocalServer -from PyQt5.QtCore import QThread, pyqtSignal, QSettings -from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout, QFormLayout -from PyQt5.QtWidgets import QPushButton, QFileDialog, QTreeWidget, QTreeWidgetItem, QAction, QSplitter, QTableWidgetItem -from PyQt5.QtGui import QIcon -import pyqtgraph as pg -import pandas as pd -import datetime -import time -from PyQt5.QtCore import * -from PyQt5.QtGui import * -from PyQt5.QtWidgets import * -import hid -import numpy as np -import os -from .version import __version__ -from pyqtgraph import ImageView - -# ---- PARSER INFRA ---- - -class BaseLogParser: - """Základní třída parseru.""" - def __init__(self, file_path): - self.file_path = file_path - - @staticmethod - def detect(file_path): - """Vrací True pokud tento parser umí parsovat daný soubor.""" - raise NotImplementedError - - def parse(self): - """Vrací rozparsovaná data.""" - raise NotImplementedError - - -class Airdos04CLogParser(BaseLogParser): - """Parser pro logy typu AIRDOS04C.""" - @staticmethod - def detect(file_path): - with open(file_path, "r") as f: - for line in f: - if line.startswith("$DOS") and "AIRDOS04C" in line: - return True - return False - - def parse(self): - start_time = time.time() - print("AIRDOS04C parser start") - metadata = { - 'log_runs_count': 0, - 'log_device_info': {}, - 'log_info': {} - } - hist = np.zeros(1024, dtype=int) - total_counts = 0 - - - sums = [] - time_axis = [] - - inside_run = False - current_hist = None - current_counts = 0 - - with open(self.file_path, 'r') as file: - for line in file: - parts = line.strip().split(",") - match parts[0]: - case "$DOS": - metadata['log_device_info']['DOS'] = { - "type": parts[0], - "hw-model": parts[1], - "fw-version": parts[2], - "eeprom": parts[3], - "fw-commit": parts[4], - "fw-build_info": parts[5], - 'hw-sn': parts[6].strip(), - } - metadata['log_runs_count'] += 1 - case "$START": - inside_run = True - current_hist = np.zeros_like(hist) - current_counts = 0 - case "$E": - if inside_run and len(parts) >= 3: - channel = int(parts[2]) - if 0 <= channel < current_hist.shape[0]: - current_hist[channel] += 1 - current_counts += 1 - case "$STOP": - if inside_run: - # Přičti hodnoty z $STOP (kanálové stavy na konci expozice) - if len(parts) > 4: - for idx, val in enumerate(parts[4:]): - try: - current_hist[idx] += int(val) - except Exception: - pass - hist += current_hist - total_counts += current_counts - sums.append(current_counts) - time_axis.append(float(parts[2])) - inside_run = False - current_hist = None - case _: - continue - - metadata['log_info']['histogram_channels'] = hist.shape[0] - metadata['log_info']['events_total'] = int(total_counts) # pouze součet všech E! - metadata['log_info']['log_type_version'] = "2.0" - metadata['log_info']['log_type'] = 'xDOS_SPECTRAL' - metadata['log_info']['detector_type'] = "AIRDOS04C" - print("Parsed AIRDOS04C format in", time.time() - start_time, "s") - - return [np.array(time_axis), np.array(sums), hist, metadata] - - - -class OldLogParser(BaseLogParser): - """Parser pro starší logy (ne-AIRDOS04C).""" - @staticmethod - def detect(file_path): - with open(file_path, "r") as f: - for line in f: - if line.startswith("$DOS") and "AIRDOS04C" not in line: - return True - return False - - def parse(self): - start_time = time.time() - print("OLD parser start") - metadata = { - 'log_runs_count': 0, - 'log_device_info': {}, - 'log_info': {} - } - df_lines = [] # $HIST - df_metadata = [] - unique_events = [] # $HITS - with open(self.file_path, 'r') as file: - for line in file: - parts = line.strip().split(",") - match parts[0]: - case "$DOS": - metadata['log_device_info']['DOS'] = { - "type": parts[0], - "hw-model": parts[1], - "fw-version": parts[2], - "eeprom": parts[3], - "fw-commit": parts[4], - "fw-build_info": parts[5], - 'hw-sn': parts[6].strip(), - } - metadata['log_runs_count'] += 1 - case "$ENV": - df_metadata.append(parts[2:]) - case "$HIST": - df_lines.append(parts[1:]) - case "$HITS": - unique_events += [(float(parts[i]), int(parts[i+1])) for i in range(2, len(parts), 2)] - case _: - continue - np_spectrum = np.array(df_lines, dtype=float) - zero_columns = np.zeros((np_spectrum.shape[0], 1000)) - np_spectrum = np.hstack((np_spectrum, zero_columns)) - time_column = np_spectrum[:, 1] - np_spectrum = np_spectrum[:, 7:] - for event in unique_events: - t, ch = event - time_index = np.searchsorted(time_column, t) - if 0 <= time_index < np_spectrum.shape[0] and 0 <= ch < np_spectrum.shape[1]: - np_spectrum[time_index, ch] += 1 - hist = np.sum(np_spectrum[:, 1:], axis=0) - sums = np.sum(np_spectrum[:, 1:], axis=1) - metadata['log_info'].update({ - 'internal_time_min': time_column.min(), - 'internal_time_max': time_column.max(), - 'log_duration': time_column.max() - time_column.min(), - 'spectral_count': sums.shape[0], - 'channels': hist.shape[0], - 'hits_count': len(unique_events), - 'log_type_version': "1.0", - 'log_type': 'xDOS_SPECTRAL', - 'detector_type': metadata['log_device_info']['DOS'].get('hw-model', 'unknown'), - }) - print("Parsed OLD format in", time.time() - start_time, "s") - return [time_column, sums, hist, metadata] - - -class dosparser(): - def __init__(self): - pass - - def load_file(self, datafile : str , detector = None): - pass - -LOG_PARSERS = [Airdos04CLogParser, OldLogParser] - -def get_parser_for_file(file_path): - for parser_cls in LOG_PARSERS: - if parser_cls.detect(file_path): - return parser_cls(file_path) - raise ValueError("Neznámý typ logu nebo žádný vhodný parser.") - -def parse_file(file_path): - parser = get_parser_for_file(file_path) - return parser.parse() +from .parsers import ( + BaseLogParser, + Airdos04CLogParser, + OldLogParser, + get_parser_for_file, + parse_file, +) class LoadDataThread(QThread): diff --git a/dosview/parsers.py b/dosview/parsers.py new file mode 100644 index 0000000..a050d08 --- /dev/null +++ b/dosview/parsers.py @@ -0,0 +1,227 @@ +"""Parsers for different dosview log formats. + +This module isolates the parsing logic from the GUI stack so that it can be +imported and tested without initializing PyQt. +""" + +from __future__ import annotations + +from typing import List, Sequence, Tuple + +import time +from pathlib import Path + +import numpy as np + + +class BaseLogParser: + """Base parser class.""" + + def __init__(self, file_path: str | Path): + self.file_path = str(file_path) + + @staticmethod + def detect(file_path: str | Path) -> bool: + """Return True if this parser can handle the supplied file.""" + raise NotImplementedError + + def parse(self): # pragma: no cover - concrete classes implement + raise NotImplementedError + + +class Airdos04CLogParser(BaseLogParser): + """Parser for AIRDOS04C log files.""" + + @staticmethod + def detect(file_path: str | Path) -> bool: + with open(file_path, "r") as f: + for line in f: + if line.startswith("$DOS") and "AIRDOS04C" in line: + return True + return False + + def parse(self): + start_time = time.time() + print("AIRDOS04C parser start") + metadata = { + "log_runs_count": 0, + "log_device_info": {}, + "log_info": {}, + } + hist = np.zeros(1024, dtype=int) + total_counts = 0 + sums: List[int] = [] + time_axis: List[float] = [] + inside_run = False + current_hist = None + current_counts = 0 + + with open(self.file_path, "r") as file: + for line in file: + parts = line.strip().split(",") + match parts[0]: + case "$DOS": + metadata["log_device_info"]["DOS"] = { + "type": parts[0], + "hw-model": parts[1], + "fw-version": parts[2], + "eeprom": parts[3], + "fw-commit": parts[4], + "fw-build_info": parts[5], + "hw-sn": parts[6].strip(), + } + metadata["log_runs_count"] += 1 + case "$START": + inside_run = True + current_hist = np.zeros_like(hist) + current_counts = 0 + case "$E": + if inside_run and len(parts) >= 3: + channel = int(parts[2]) + if 0 <= channel < current_hist.shape[0]: + current_hist[channel] += 1 + current_counts += 1 + case "$STOP": + if inside_run: + if len(parts) > 4: + for idx, val in enumerate(parts[4:]): + try: + current_hist[idx] += int(val) + except ValueError: + continue + hist += current_hist + total_counts += current_counts + sums.append(current_counts) + time_axis.append(float(parts[2])) + inside_run = False + current_hist = None + case _: + continue + + metadata["log_info"]["histogram_channels"] = hist.shape[0] + metadata["log_info"]["events_total"] = int(total_counts) + metadata["log_info"]["log_type_version"] = "2.0" + metadata["log_info"]["log_type"] = "xDOS_SPECTRAL" + metadata["log_info"]["detector_type"] = "AIRDOS04C" + print("Parsed AIRDOS04C format in", time.time() - start_time, "s") + + return [np.array(time_axis), np.array(sums), hist, metadata] + + +class OldLogParser(BaseLogParser): + """Parser for legacy (pre-AIRDOS04C) log files.""" + + @staticmethod + def detect(file_path: str | Path) -> bool: + with open(file_path, "r") as f: + for line in f: + if line.startswith("$DOS") and "AIRDOS04C" not in line: + return True + if line.startswith("$AIRDOS"): + return True + if line.startswith("$HIST"): + return True + return False + + def parse(self): + start_time = time.time() + print("OLD parser start") + metadata = { + "log_runs_count": 0, + "log_device_info": {}, + "log_info": {}, + } + df_lines: List[Sequence[str]] = [] + df_metadata: List[Sequence[str]] = [] + unique_events: List[Tuple[float, int]] = [] + with open(self.file_path, "r") as file: + for line in file: + parts = line.strip().split(",") + match parts[0]: + case "$DOS": + metadata["log_device_info"]["DOS"] = { + "type": parts[0], + "hw-model": parts[1], + "fw-version": parts[2], + "eeprom": parts[3], + "fw-commit": parts[4], + "fw-build_info": parts[5], + "hw-sn": parts[6].strip(), + } + metadata["log_runs_count"] += 1 + case "$AIRDOS": + metadata["log_device_info"]["AIRDOS"] = { + "type": parts[0], + "hw-model": parts[1] if len(parts) > 1 else "", + "detector": parts[2] if len(parts) > 2 else "", + "hw-sn": parts[3].strip() if len(parts) > 3 else "", + } + metadata["log_runs_count"] += 1 + case "$ENV": + df_metadata.append(parts[2:]) + case "$HIST": + df_lines.append(parts[1:]) + case "$HITS": + for i in range(2, len(parts) - 1, 2): + try: + unique_events.append((float(parts[i]), int(parts[i + 1]))) + except ValueError: + continue + case _: + continue + if not df_lines: + raise ValueError("Soubor neobsahuje žádné záznamy $HIST pro starší log.") + np_spectrum = np.array(df_lines, dtype=float) + zero_columns = np.zeros((np_spectrum.shape[0], 1000)) + np_spectrum = np.hstack((np_spectrum, zero_columns)) + time_column = np_spectrum[:, 1] + np_spectrum = np_spectrum[:, 7:] + for event in unique_events: + t, ch = event + time_index = np.searchsorted(time_column, t) + if 0 <= time_index < np_spectrum.shape[0] and 0 <= ch < np_spectrum.shape[1]: + np_spectrum[time_index, ch] += 1 + hist = np.sum(np_spectrum[:, 1:], axis=0) + sums = np.sum(np_spectrum[:, 1:], axis=1) + metadata["log_info"].update( + { + "internal_time_min": float(time_column.min()), + "internal_time_max": float(time_column.max()), + "log_duration": float(time_column.max() - time_column.min()), + "spectral_count": int(sums.shape[0]), + "channels": int(hist.shape[0]), + "hits_count": len(unique_events), + "log_type_version": "1.0", + "log_type": "xDOS_SPECTRAL", + "detector_type": metadata["log_device_info"].get("DOS", {}).get( + "hw-model", + metadata["log_device_info"].get("AIRDOS", {}).get("hw-model", "unknown"), + ), + } + ) + print("Parsed OLD format in", time.time() - start_time, "s") + return [time_column, sums, hist, metadata] + + +LOG_PARSERS: Sequence[type[BaseLogParser]] = [Airdos04CLogParser, OldLogParser] + + +def get_parser_for_file(file_path: str | Path) -> BaseLogParser: + for parser_cls in LOG_PARSERS: + if parser_cls.detect(file_path): + return parser_cls(file_path) + raise ValueError("Neznámý typ logu nebo žádný vhodný parser.") + + +def parse_file(file_path: str | Path): + parser = get_parser_for_file(file_path) + return parser.parse() + + +__all__ = [ + "BaseLogParser", + "Airdos04CLogParser", + "OldLogParser", + "get_parser_for_file", + "parse_file", +] diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..7c4171f --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,69 @@ +from pathlib import Path +import importlib.util + +import numpy as np +import pytest + +ROOT = Path(__file__).resolve().parent.parent +PARSERS_PATH = ROOT / "dosview" / "parsers.py" + +spec = importlib.util.spec_from_file_location("dosview_parsers", PARSERS_PATH) +parsers = importlib.util.module_from_spec(spec) +assert spec.loader is not None +spec.loader.exec_module(parsers) + +LOG_PARSERS = parsers.LOG_PARSERS +get_parser_for_file = parsers.get_parser_for_file +parse_file = parsers.parse_file + +DATA_DIR = ROOT / "data" + +if not DATA_DIR.exists(): + pytest.skip("Data fixture directory is missing", allow_module_level=True) + +LOG_FIXTURES = sorted( + path + for path in DATA_DIR.iterdir() + if path.is_file() and not path.name.startswith(".") +) + +if not LOG_FIXTURES: + pytest.skip("No data fixtures found for parser tests", allow_module_level=True) + + +@pytest.mark.parametrize("log_path", LOG_FIXTURES, ids=lambda p: p.name) +def test_any_parser_detects_fixture(log_path): + assert log_path.exists(), f"Fixture {log_path.name} is missing" + detected = [parser for parser in LOG_PARSERS if parser.detect(log_path)] + assert detected, f"No parser detected {log_path.name}" + parser_instance = get_parser_for_file(log_path) + assert any(isinstance(parser_instance, parser_cls) for parser_cls in detected) + + +@pytest.mark.parametrize("log_path", LOG_FIXTURES, ids=lambda p: p.name) +def test_parse_fixture_returns_consistent_shapes(log_path): + time_axis, sums, hist, metadata = parse_file(log_path) + + for array in (time_axis, sums, hist): + np_array = np.asarray(array, dtype=float) + assert np_array.ndim == 1 + assert np_array.size > 0 + assert np.all(np.isfinite(np_array)) + + assert time_axis.shape[0] == sums.shape[0] + assert hist.shape[0] > 0 + assert np.all(np.asarray(hist) >= 0) + + if time_axis.shape[0] > 1: + assert np.all(np.diff(np.asarray(time_axis, dtype=float)) >= 0) + + assert isinstance(metadata, dict) + assert "log_info" in metadata and isinstance(metadata["log_info"], dict) + assert "log_type" in metadata["log_info"] + + if log_path.name == "legacy_airdos_log.txt": + assert metadata["log_device_info"]["AIRDOS"]["detector"] == "NaI(Tl)-D16x30" + assert metadata["log_info"]["detector_type"] == "GEO_1024_v1" + + if metadata["log_info"].get("detector_type") == "AIRDOS04C": + assert metadata["log_info"].get("histogram_channels") == hist.shape[0]