Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Python tests

on:
push:
pull_request:

jobs:
pytest:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install . pytest
- name: Run pytest
run: pytest
Empty file added data/.gitkeep
Empty file.
223 changes: 8 additions & 215 deletions dosview/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import pyqtgraph as pg

import pandas as pd
from PyQt5.QtWidgets import QSplitter

import datetime
import time
import time

from PyQt5.QtCore import *
from PyQt5.QtGui import *
Expand All @@ -27,219 +26,13 @@
from .version import __version__
from pyqtgraph import ImageView



import sys
import argparse

from PyQt5 import QtNetwork
from PyQt5.QtNetwork import QLocalSocket, QLocalServer
from PyQt5.QtCore import QThread, pyqtSignal, QSettings
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QHBoxLayout, QFormLayout
from PyQt5.QtWidgets import QPushButton, QFileDialog, QTreeWidget, QTreeWidgetItem, QAction, QSplitter, QTableWidgetItem
from PyQt5.QtGui import QIcon
import pyqtgraph as pg
import pandas as pd
import datetime
import time
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import hid
import numpy as np
import os
from .version import __version__
from pyqtgraph import ImageView

# ---- PARSER INFRA ----

class BaseLogParser:
"""Základní třída parseru."""
def __init__(self, file_path):
self.file_path = file_path

@staticmethod
def detect(file_path):
"""Vrací True pokud tento parser umí parsovat daný soubor."""
raise NotImplementedError

def parse(self):
"""Vrací rozparsovaná data."""
raise NotImplementedError


class Airdos04CLogParser(BaseLogParser):
"""Parser pro logy typu AIRDOS04C."""
@staticmethod
def detect(file_path):
with open(file_path, "r") as f:
for line in f:
if line.startswith("$DOS") and "AIRDOS04C" in line:
return True
return False

def parse(self):
start_time = time.time()
print("AIRDOS04C parser start")
metadata = {
'log_runs_count': 0,
'log_device_info': {},
'log_info': {}
}
hist = np.zeros(1024, dtype=int)
total_counts = 0


sums = []
time_axis = []

inside_run = False
current_hist = None
current_counts = 0

with open(self.file_path, 'r') as file:
for line in file:
parts = line.strip().split(",")
match parts[0]:
case "$DOS":
metadata['log_device_info']['DOS'] = {
"type": parts[0],
"hw-model": parts[1],
"fw-version": parts[2],
"eeprom": parts[3],
"fw-commit": parts[4],
"fw-build_info": parts[5],
'hw-sn': parts[6].strip(),
}
metadata['log_runs_count'] += 1
case "$START":
inside_run = True
current_hist = np.zeros_like(hist)
current_counts = 0
case "$E":
if inside_run and len(parts) >= 3:
channel = int(parts[2])
if 0 <= channel < current_hist.shape[0]:
current_hist[channel] += 1
current_counts += 1
case "$STOP":
if inside_run:
# Přičti hodnoty z $STOP (kanálové stavy na konci expozice)
if len(parts) > 4:
for idx, val in enumerate(parts[4:]):
try:
current_hist[idx] += int(val)
except Exception:
pass
hist += current_hist
total_counts += current_counts
sums.append(current_counts)
time_axis.append(float(parts[2]))
inside_run = False
current_hist = None
case _:
continue

metadata['log_info']['histogram_channels'] = hist.shape[0]
metadata['log_info']['events_total'] = int(total_counts) # pouze součet všech E!
metadata['log_info']['log_type_version'] = "2.0"
metadata['log_info']['log_type'] = 'xDOS_SPECTRAL'
metadata['log_info']['detector_type'] = "AIRDOS04C"
print("Parsed AIRDOS04C format in", time.time() - start_time, "s")

return [np.array(time_axis), np.array(sums), hist, metadata]



class OldLogParser(BaseLogParser):
"""Parser pro starší logy (ne-AIRDOS04C)."""
@staticmethod
def detect(file_path):
with open(file_path, "r") as f:
for line in f:
if line.startswith("$DOS") and "AIRDOS04C" not in line:
return True
return False

def parse(self):
start_time = time.time()
print("OLD parser start")
metadata = {
'log_runs_count': 0,
'log_device_info': {},
'log_info': {}
}
df_lines = [] # $HIST
df_metadata = []
unique_events = [] # $HITS
with open(self.file_path, 'r') as file:
for line in file:
parts = line.strip().split(",")
match parts[0]:
case "$DOS":
metadata['log_device_info']['DOS'] = {
"type": parts[0],
"hw-model": parts[1],
"fw-version": parts[2],
"eeprom": parts[3],
"fw-commit": parts[4],
"fw-build_info": parts[5],
'hw-sn': parts[6].strip(),
}
metadata['log_runs_count'] += 1
case "$ENV":
df_metadata.append(parts[2:])
case "$HIST":
df_lines.append(parts[1:])
case "$HITS":
unique_events += [(float(parts[i]), int(parts[i+1])) for i in range(2, len(parts), 2)]
case _:
continue
np_spectrum = np.array(df_lines, dtype=float)
zero_columns = np.zeros((np_spectrum.shape[0], 1000))
np_spectrum = np.hstack((np_spectrum, zero_columns))
time_column = np_spectrum[:, 1]
np_spectrum = np_spectrum[:, 7:]
for event in unique_events:
t, ch = event
time_index = np.searchsorted(time_column, t)
if 0 <= time_index < np_spectrum.shape[0] and 0 <= ch < np_spectrum.shape[1]:
np_spectrum[time_index, ch] += 1
hist = np.sum(np_spectrum[:, 1:], axis=0)
sums = np.sum(np_spectrum[:, 1:], axis=1)
metadata['log_info'].update({
'internal_time_min': time_column.min(),
'internal_time_max': time_column.max(),
'log_duration': time_column.max() - time_column.min(),
'spectral_count': sums.shape[0],
'channels': hist.shape[0],
'hits_count': len(unique_events),
'log_type_version': "1.0",
'log_type': 'xDOS_SPECTRAL',
'detector_type': metadata['log_device_info']['DOS'].get('hw-model', 'unknown'),
})
print("Parsed OLD format in", time.time() - start_time, "s")
return [time_column, sums, hist, metadata]


class dosparser():
def __init__(self):
pass

def load_file(self, datafile : str , detector = None):
pass

LOG_PARSERS = [Airdos04CLogParser, OldLogParser]

def get_parser_for_file(file_path):
for parser_cls in LOG_PARSERS:
if parser_cls.detect(file_path):
return parser_cls(file_path)
raise ValueError("Neznámý typ logu nebo žádný vhodný parser.")

def parse_file(file_path):
parser = get_parser_for_file(file_path)
return parser.parse()
from .parsers import (
BaseLogParser,
Airdos04CLogParser,
OldLogParser,
get_parser_for_file,
parse_file,
)


class LoadDataThread(QThread):
Expand Down
Loading
Loading