Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion healthagent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class EvalType(StrEnum):
NE = "ne"
IN = "in"
BITMASK = "bitmask"
DELTA_GT = "delta_gt"
WINDOW_GT = "window_gt"


Expand Down
7 changes: 0 additions & 7 deletions healthagent/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ gpu:
warning: 1.0e-6
category: NVLink
msg: "GPU {gpu} NVLink effective BER {value:.2e} exceeds {threshold:.2e}"
DCGM_FI_DEV_PCIE_REPLAY_COUNTER:
eval: delta_gt
warning: 50
error: 200
category: PCIe
msg: "GPU {gpu} PCIe replay rate {value:.0f}/min exceeds {threshold}/min"

# ── Systemd module ──────────────────────────────────────
systemd:
services:
Expand Down
8 changes: 1 addition & 7 deletions healthagent/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
log = logging.getLogger('healthagent')

# Maximum number of samples to retain per field per entity.
# At 1s polling, 300 = 5-minute window for delta_gt rate computation.
MAX_KEEP_SAMPLES = 300

# Field-specific enrichments: map field ID -> callable(raw_value, field_values) -> list[str].
Expand Down Expand Up @@ -224,8 +223,7 @@ def track_fieldsv2(self):
if not samples or samples[0].isBlank:
continue

newest, oldest = samples[-1], samples[0]
#log.debug(f"Field {watch['field']} entity {entity_id}: {len(samples)} samples, oldest={oldest.value} ts={oldest.ts}, newest={newest.value} ts={newest.ts}")
newest = samples[-1]
severity = None
threshold_used = None
evaluated = None
Expand All @@ -235,10 +233,6 @@ def track_fieldsv2(self):
continue
triggered, evaluated = evaluate(
watch["eval"], newest.value, thresh,
prev_value=oldest.value,
prev_time=oldest.ts / 1_000_000,
current_time=newest.ts / 1_000_000,
window=watch.get("window", 60),
)
if triggered:
severity = level
Expand Down
24 changes: 4 additions & 20 deletions healthagent/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,19 @@ def __len__(self):
return len(self._samples)


def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None,
current_time=None, window=60, samples: TimeSeries = None):
def evaluate(eval_type, value, threshold, *, window=60, samples: TimeSeries = None):
"""Unified threshold evaluation. Returns (triggered: bool, evaluated_value).

For delta_gt, evaluated_value is the computed rate per window.
For window_gt, evaluated_value is the delta within the time window.
For bitmask, evaluated_value is the matching bits (value & threshold).
For all others, evaluated_value is the input value.

Args:
eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, delta_gt, window_gt)
eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, window_gt)
value: Current value to evaluate
threshold: Threshold to compare against (list for 'in' eval type)
prev_value: Previous sample value (delta_gt only)
prev_time: Previous sample timestamp in monotonic seconds (delta_gt only)
current_time: Current sample timestamp in monotonic seconds (delta_gt only)
window: Time window in seconds (delta_gt: rate normalization, window_gt: sliding window size)
samples: TimeSeries instance for recording and windowed evaluation (window_gt only)
window: Time window in seconds for window_gt sliding window size (default: 60)
samples: TimeSeries instance for windowed evaluation (window_gt only)
"""
eval_type = str(eval_type).strip().lower()

Expand All @@ -220,17 +215,6 @@ def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None,
elif eval_type == "bitmask":
matched = operator.index(value) & operator.index(threshold)
return matched != 0, matched
elif eval_type == "delta_gt":
if prev_value is None or prev_time is None or current_time is None:
return False, 0.0
delta = value - prev_value
if delta < 0:
return False, 0.0
elapsed = current_time - prev_time
if elapsed <= 0:
return False, 0.0
rate = (delta * window) / elapsed
return rate > threshold, rate
elif eval_type == "window_gt":
if samples is None:
return False, 0
Expand Down
9 changes: 0 additions & 9 deletions integration/test_inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def _report(name, value_str, ok):
FI_RETIRED_PENDING = 392
FI_FABRIC_MANAGER_STATUS = 170
FI_RECOVERY_ACTION = 1523
FI_PCIE_REPLAY_COUNTER = 202
FI_EFFECTIVE_BER_FLOAT = 1218
FI_FABRIC_HEALTH_MASK = 174
FI_XID_ERRORS = 230
Expand All @@ -82,12 +81,6 @@ def test_clocks(gpu_id, duration):
inject_loop(gpu_id, FI_CLOCKS_EVENT_REASONS, 0xE8, duration, "CLOCKS_EVENT_REASONS")


def test_pcie_replay(gpu_id, duration):
"""Inject high PCIe replay counter — triggers delta_gt."""
print(f"\n=== GPU {gpu_id}: PCIe replay counter (delta_gt warning > 50, error > 200) ===")
inject_loop(gpu_id, FI_PCIE_REPLAY_COUNTER, 99999, duration, "PCIE_REPLAY_COUNTER")


def test_persistence_mode(gpu_id, duration):
"""Inject persistence mode = 0 — triggers ne 1 error."""
print(f"\n=== GPU {gpu_id}: Persistence mode (error if != 1) ===")
Expand Down Expand Up @@ -167,7 +160,6 @@ def test_clear(gpu_id, duration):
(FI_FABRIC_HEALTH_MASK, 0x1AA, "FABRIC_HEALTH_MASK (healthy)"),
(FI_RECOVERY_ACTION, 0, "RECOVERY_ACTION"),
(FI_EFFECTIVE_BER_FLOAT, 0.0, "EFFECTIVE_BER_FLOAT"),
(FI_PCIE_REPLAY_COUNTER, 0, "PCIE_REPLAY_COUNTER"),
(FI_XID_ERRORS, 0, "XID_ERRORS"),
]
for field_id, value, name in clears:
Expand All @@ -180,7 +172,6 @@ def test_clear(gpu_id, duration):
TESTS = {
"temp": test_temperature,
"clocks": test_clocks,
"pcie": test_pcie_replay,
"persist": test_persistence_mode,
"dbe": test_dbe,
"remap": test_row_remap,
Expand Down
93 changes: 0 additions & 93 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,99 +119,6 @@ def test_bitmask_rejects_float_threshold(self):
evaluate("bitmask", 0xFF, 3.14)


class TestEvaluateDeltaGt:

def test_normal_rate_triggers(self):
# 120 events in 60 seconds = 120/min, threshold 100/min
triggered, rate = evaluate("delta_gt", 620, 100,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is True
assert rate == 120.0

def test_normal_rate_below_threshold(self):
# 50 events in 60 seconds = 50/min, threshold 100/min
triggered, rate = evaluate("delta_gt", 550, 100,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is False
assert rate == 50.0

def test_exact_threshold_not_triggered(self):
# 100 events in 60 seconds = 100/min, threshold 100/min (not strictly greater)
triggered, rate = evaluate("delta_gt", 600, 100,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is False
assert rate == 100.0

def test_negative_delta_ignored(self):
# Counter reset: current < prev
triggered, rate = evaluate("delta_gt", 10, 0,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is False
assert rate == 0.0

def test_zero_elapsed_ignored(self):
triggered, rate = evaluate("delta_gt", 600, 100,
prev_value=500, prev_time=10.0, current_time=10.0)
assert triggered is False
assert rate == 0.0

def test_missing_prev_value(self):
# First sample — no previous data
triggered, rate = evaluate("delta_gt", 500, 100,
prev_value=None, prev_time=0.0, current_time=60.0)
assert triggered is False
assert rate == 0.0

def test_missing_prev_time(self):
triggered, rate = evaluate("delta_gt", 500, 100,
prev_value=400, prev_time=None, current_time=60.0)
assert triggered is False
assert rate == 0.0

def test_missing_current_time(self):
triggered, rate = evaluate("delta_gt", 500, 100,
prev_value=400, prev_time=0.0, current_time=None)
assert triggered is False
assert rate == 0.0

def test_custom_window_per_hour(self):
# 10 events in 60 seconds = 600/hour, threshold 500/hour
triggered, rate = evaluate("delta_gt", 510, 500,
prev_value=500, prev_time=0.0, current_time=60.0,
window=3600)
assert triggered is True
assert rate == 600.0

def test_custom_window_per_second(self):
# 120 events in 60 seconds = 2/sec, threshold 1/sec
triggered, rate = evaluate("delta_gt", 620, 1,
prev_value=500, prev_time=0.0, current_time=60.0,
window=1)
assert triggered is True
assert rate == 2.0

def test_threshold_zero_any_increment(self):
# Any new event triggers when threshold is 0
triggered, rate = evaluate("delta_gt", 501, 0,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is True
assert rate > 0

def test_no_change_threshold_zero(self):
# No new events, threshold 0 — should not trigger
triggered, rate = evaluate("delta_gt", 500, 0,
prev_value=500, prev_time=0.0, current_time=60.0)
assert triggered is False
assert rate == 0.0

def test_delayed_cycle_rate_normalized(self):
# 250 events over 5 minutes = 50/min, threshold 100/min — should NOT trigger
triggered, rate = evaluate("delta_gt", 750, 100,
prev_value=500, prev_time=0.0, current_time=300.0)
assert triggered is False
assert rate == 50.0


class TestEvaluateIn:

def test_in_match(self):
Expand Down