From 8e04440a3fb316a6bf78cea55390f9631500a69d Mon Sep 17 00:00:00 2001 From: Aditi Gaur Date: Tue, 12 May 2026 15:47:09 -0700 Subject: [PATCH] Port network healthchecks to use NetworkConfig. Add window_gt evaluation type for tracking link flapping criteria. Default link_flapping criteria is 3 flaps over 3 hrs. Add a "strikes" param to the config that allows to us to configure recovery. "strikes" define how many times a particular threshold is allowed to fail after which all recovery attempts are ignored and node is permanently degraded. --- README.md | 59 ++--- healthagent/config.py | 19 +- healthagent/defaults.yaml | 8 +- healthagent/network.py | 71 ++++-- healthagent/util.py | 69 +++++- tests/test_config.py | 30 +++ tests/test_network.py | 496 ++++++++++++++++++++++++++++++++++++++ tests/test_util.py | 196 ++++++++++++++- 8 files changed, 883 insertions(+), 65 deletions(-) create mode 100644 tests/test_network.py diff --git a/README.md b/README.md index ac601bc..1dfbe60 100644 --- a/README.md +++ b/README.md @@ -314,60 +314,51 @@ Virtual interfaces (such as bridges, VLAN interfaces, and container networks) ar **Monitoring Behavior:** - **Periodic Checks**: Runs every 60 seconds to sample network interface state -- **Sliding Window Analysis**: Tracks link down events over a 60-sample window to calculate link flap rates +- **Windowed Analysis**: Uses `window_gt` evaluation to track link flap events over a configurable time window (default: 3 hours for IB `link_downed`) - **Physical Interfaces Only**: Automatically filters out virtual interfaces by checking sysfs device paths -- **Auto-Clearing**: Automatically clears error reports when interfaces return to healthy operational state +- **Strikes-Based Recovery**: The `strikes` parameter controls how many times a check can recover from error before the node is permanently degraded. With `strikes: 1` (default for `link_downed`), the first error is permanent. Set `strikes: 0` to allow unlimited recovery. **Alert Conditions:** -The network monitor triggers alerts based on two conditions: +The network monitor evaluates each configured check per interface. Default InfiniBand checks include: -1. **Link Flapping (WARNING)**: When an interface goes down 1 or more times per hour - - Status: `Warning` - - Indicates unstable network connectivity that may impact workloads - - Includes `link_down_rate_per_hour` metric in custom fields - -2. **Interface Not Operational (ERROR)**: When an interface is not in the `up` operational state +1. **Link Flapping (ERROR)**: When `link_downed` increases by 3 or more within a 3-hour window - Status: `Error` - - Indicates a critical network failure or misconfiguration - - Reports the current operational state (down, lowerlayerdown, etc.) + - Default: `eval: window_gt`, `error: 3`, `window: 10800`, `strikes: 1` + - Permanently degrades the node after the first occurrence (strikes=1) + +2. **Link State (ERROR)**: When IB link state is not `4: ACTIVE` or physical state is not `5: LinkUp` + +3. **Link Error Recovery (WARNING)**: When `link_error_recovery` count exceeds 3 + +4. **IPoIB State (WARNING)**: When IPoIB `operstate` is not `up` + +5. **Carrier Down (WARNING)**: When `carrier_down_count` exceeds threshold (5 for IB, 3 for Ethernet) + +6. **Ethernet Operstate (ERROR)**: When Ethernet interface is not in the `up` operational state **Response to Network Issues:** When network problems are detected, healthagent: -1. Sets appropriate health status (`WARNING` for flapping, `ERROR` for down interfaces) +1. Sets appropriate health status (`WARNING` or `ERROR`) based on configured thresholds 2. Creates a detailed report with: - List of affected interfaces - - Specific error conditions for each interface - - Link flap rates and state transition counts - - Current carrier and operational states -3. Includes per-interface custom fields: - - `link_down_rate_per_hour`: Recent link failure rate - - `link_flap_since_uptime`: Total link state transitions since boot - - `error_count`: Number of errors for the interface - - `carrier`: Physical link status -4. Reports the issue to CycleCloud (if enabled) + - Specific error conditions for each interface and port + - Current link and operational states +3. Reports the issue to CycleCloud (if enabled) Example network interface alert: ```json { "network": { - "Network": { + "NetworkInterfaceCheck": { "status": "Error", - "message": "Network reports errors", - "description": "Network interfaces eth0,ib0 are not operational", - "details": "Network interface eth0 is not operational and in state down.\nNetwork interface ib0 went down 3 times in the last hour", + "description": "Network interfaces report errors", + "details": "ERROR: ib0 - port 1: IB link flapped 3+ times within a 3-hour window", "last_update": "2025-08-07T19:43:50 UTC", - "eth0": { - "link_down_rate_per_hour": 0, - "link_flap_since_uptime": 2, - "error_count": 1, - "carrier": "0" - }, "ib0": { - "link_down_rate_per_hour": 3, - "link_flap_since_uptime": 15, - "error_count": 0 + "errors": ["port 1: IB link flapped 3+ times within a 3-hour window"], + "warnings": [] } } } diff --git a/healthagent/config.py b/healthagent/config.py index 6310910..ad9357d 100644 --- a/healthagent/config.py +++ b/healthagent/config.py @@ -5,7 +5,7 @@ from typing import Union import yaml -from pydantic import BaseModel +from pydantic import BaseModel, field_validator log = logging.getLogger(__name__) @@ -26,6 +26,7 @@ class EvalType(StrEnum): IN = "in" BITMASK = "bitmask" DELTA_GT = "delta_gt" + WINDOW_GT = "window_gt" class ModuleName(StrEnum): @@ -43,6 +44,22 @@ class ThresholdCheck(BaseModel, extra="forbid"): category: str | None = None warning: Union[int, float, str, list] | None = None error: Union[int, float, str, list] | None = None + window: int | None = None + strikes: int = 0 + + @field_validator('window') + @classmethod + def window_must_be_positive(cls, v): + if v is not None and v <= 0: + raise ValueError('window must be > 0') + return v + + @field_validator('strikes') + @classmethod + def strikes_must_be_non_negative(cls, v): + if v < 0: + raise ValueError('strikes must be >= 0') + return v class ModuleConfig(BaseModel): diff --git a/healthagent/defaults.yaml b/healthagent/defaults.yaml index 71fc467..707c05a 100644 --- a/healthagent/defaults.yaml +++ b/healthagent/defaults.yaml @@ -24,9 +24,11 @@ network: error: "5: LinkUp" msg: "IB link not in state LinkUp" link_downed: - eval: gt + eval: window_gt + window: 10800 error: 3 - msg: "IB link down count exceeded the threshold" + strikes: 1 + msg: "IB link flapped 3+ times within a 3-hour window" link_error_recovery: eval: gt warning: 3 @@ -58,7 +60,7 @@ gpu: # By default all XIDS are treated as error. # XIDS in warning list are downgraded to warning. # If error list is explicitly specified, then only - # specific XIDS will be treated as ERROR. + # specified XIDS will be treated as ERROR. xid: warning: [43, 63, 13, 31, 66, 94, 154] ignore: [] diff --git a/healthagent/network.py b/healthagent/network.py index 586617e..b56b137 100644 --- a/healthagent/network.py +++ b/healthagent/network.py @@ -1,11 +1,12 @@ import logging +import math from enum import Enum from pathlib import Path from dataclasses import dataclass, fields from healthagent import healthcheck -from healthagent.util import read_kernel_attrs, evaluate +from healthagent.util import read_kernel_attrs, evaluate, TimeSeries from healthagent.healthmodule import HealthModule -from healthagent.config import NetworkConfig +from healthagent.config import NetworkConfig, ThresholdCheck from healthagent.reporter import Reporter, HealthReport, HealthStatus from healthagent.scheduler import Scheduler @@ -20,21 +21,6 @@ class NetDevType(Enum): INFINIBAND = 32 UNKNOWN = -1 -_NET_CONFIG = { - "infiniband": { - "state": {"eval": "ne", "error": "4: ACTIVE", "msg": "IB link is not active"}, - "phys_state": {"eval": "ne", "error": "5: LinkUp", "msg": "IB link not in state LinkUp"}, - "link_downed": {"eval": "gt", "error": 3, "msg": "IB link down count exceeded the threshold"}, - "link_error_recovery": {"eval": "gt", "warning": 3, "msg": "IB link error recovery exceeded the threshold"}, - "operstate": {"eval": "ne", "warning": "up", "msg": "IPoIB not in state up"}, - "carrier_down_count": {"eval": "gt", "warning": 5, "msg": "IPoIB carrier down count exceeded threshold"}, - }, - "ethernet": { - "carrier_down_count": {"eval": "gt", "warning": 3, "msg": "carrier_down_count exceeded threshold"}, - "operstate": {"eval": "ne", "error": "up", "msg": "interface not in state up"}, - }, -} - @dataclass class IBPort: @@ -65,6 +51,10 @@ class NetworkHealthChecks(HealthModule): def __init__(self, reporter: Reporter, config: 'NetworkConfig | None' = None): super().__init__(reporter, config or NetworkConfig()) + self.config: NetworkConfig = self.config + self._time_series = {} # {key: TimeSeries} — windowed sample buffers + self._in_error = {} # {key: bool} — currently in error state? + self._trigger_count = {} # {key: int} — OK→ERROR transition count async def create(self): await self.reporter.clear_all_errors() @@ -131,14 +121,41 @@ async def run_network_checks(self): # Derive port-level field names from the IBPort dataclass _ib_port_fields = {f.name for f in fields(IBPort)} - def check_field(iface_name, field, check, value, port_num=None): + def check_field(iface_name, field, check: 'ThresholdCheck', value, port_num=None): + key = (iface_name, field, port_num) + max_strikes = check.strikes + + # Permanently degraded — re-report error without re-evaluating + if max_strikes > 0 and self._trigger_count.get(key, 0) >= max_strikes: + msg = check.msg or f"{field} threshold exceeded" + if port_num is not None: + msg = f"port {port_num}: {msg}" + custom_fields.setdefault(iface_name, {}).setdefault("errors", []).append(msg) + details.append(f"ERROR: {iface_name} - {msg}") + report.escalate(HealthStatus.ERROR) + return + + eval_type = check.eval + eval_kwargs = {} + if eval_type == "window_gt": + window = check.window or 3600 + if key not in self._time_series: + self._time_series[key] = TimeSeries(maxlen=math.ceil(window / 60) + 1) + self._time_series[key].record(value) + eval_kwargs["samples"] = self._time_series[key] + eval_kwargs["window"] = window + + triggered = False + triggered_level = None for level in ("error", "warning"): - thresh = check.get(level) + thresh = getattr(check, level, None) if thresh is None: continue - triggered, _ = evaluate(check["eval"], value, thresh) - if triggered: - msg = check.get("msg", f"{field}={value} (threshold: {level} {check['eval']} {thresh})") + hit, _ = evaluate(eval_type, value, thresh, **eval_kwargs) + if hit: + triggered = True + triggered_level = level + msg = check.msg or f"{field}={value} (threshold: {level} {eval_type} {thresh})" if port_num is not None: msg = f"port {port_num}: {msg}" if level == "error": @@ -151,12 +168,20 @@ def check_field(iface_name, field, check, value, port_num=None): report.escalate(HealthStatus.WARNING) break # error takes precedence over warning + # Strike tracking: count OK→ERROR transitions + if triggered and triggered_level == "error" and max_strikes > 0: + if not self._in_error.get(key): + self._in_error[key] = True + self._trigger_count[key] = self._trigger_count.get(key, 0) + 1 + elif triggered_level != "error": + self._in_error[key] = False + for ni in interfaces: custom_fields[ni.name] = {} # Select the right config section based on interface type config_key = "infiniband" if ni.type == NetDevType.INFINIBAND else "ethernet" - checks = _NET_CONFIG.get(config_key, {}) + checks: dict[str, ThresholdCheck] = getattr(self.config, config_key, {}) # Evaluate each configured check for field, check in checks.items(): diff --git a/healthagent/util.py b/healthagent/util.py index a3ce7b9..6cebf04 100644 --- a/healthagent/util.py +++ b/healthagent/util.py @@ -1,5 +1,8 @@ +import bisect import operator import os +import time +from collections import deque from pathlib import Path @@ -128,22 +131,75 @@ def _read_top_level(root: str) -> dict: return result +class TimeSeries: + """Ring buffer of timestamped samples for windowed evaluation. + + Modules create and manage their own instances. Pass to evaluate() + via the samples parameter for window_gt evaluation. + + Args: + maxlen: Maximum number of samples to retain. When exceeded, + the oldest sample is discarded. Modules should size this + based on window / poll_interval (e.g. 10800/60 + 1 = 181). + """ + + def __init__(self, maxlen=None): + self._samples = deque(maxlen=maxlen) + + def record(self, value, timestamp=None): + """Append a sample. Timestamp defaults to time.monotonic().""" + if timestamp is None: + timestamp = time.monotonic() + self._samples.append((value, timestamp)) + + def delta_in_window(self, window_seconds): + """Compute value delta within the time window. + + Returns (delta: int|float, sufficient: bool). + sufficient is False if the recorded time span is less than + window_seconds (not enough history to evaluate). + """ + if len(self._samples) < 2: + return 0, False + + latest_val, latest_ts = self._samples[-1] + oldest_val, oldest_ts = self._samples[0] + + if (latest_ts - oldest_ts) < window_seconds: + return 0, False + + cutoff = latest_ts - window_seconds + # Binary search for the oldest sample at or after the cutoff + timestamps = [s[1] for s in self._samples] + idx = bisect.bisect_left(timestamps, cutoff) + if idx < len(self._samples): + delta = latest_val - self._samples[idx][0] + return max(0, delta), True + + return 0, False + + def __len__(self): + return len(self._samples) + + def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None, - current_time=None, window=60): + current_time=None, window=60, samples: TimeSeries = None): """Unified threshold evaluation. Returns (triggered: bool, evaluated_value). For delta_gt, evaluated_value is the computed rate per window. + For window_gt, evaluated_value is the delta within the time window. For bitmask, evaluated_value is the matching bits (value & threshold). For all others, evaluated_value is the input value. Args: - eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, delta_gt) + eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, delta_gt, window_gt) value: Current value to evaluate threshold: Threshold to compare against (list for 'in' eval type) prev_value: Previous sample value (delta_gt only) prev_time: Previous sample timestamp in monotonic seconds (delta_gt only) current_time: Current sample timestamp in monotonic seconds (delta_gt only) - window: Time window in seconds for rate normalization (delta_gt only, default: 60) + window: Time window in seconds (delta_gt: rate normalization, window_gt: sliding window size) + samples: TimeSeries instance for recording and windowed evaluation (window_gt only) """ eval_type = str(eval_type).strip().lower() @@ -175,4 +231,11 @@ def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None, return False, 0.0 rate = (delta * window) / elapsed return rate > threshold, rate + elif eval_type == "window_gt": + if samples is None: + return False, 0 + delta, sufficient = samples.delta_in_window(window) + if not sufficient: + return False, delta + return delta >= threshold, delta raise ValueError(f"Unknown eval_type: {eval_type!r}") \ No newline at end of file diff --git a/tests/test_config.py b/tests/test_config.py index 88d43f0..2dd80ec 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -226,6 +226,36 @@ def test_valid_partial_config(self): assert config.systemd.services == [] assert config.proc.zombie_per_core == 50 + def test_window_zero_rejected(self): + """window=0 is rejected by ThresholdCheck validator.""" + with pytest.raises(ValidationError): + ThresholdCheck(eval=EvalType.WINDOW_GT, error=3, window=0) + + def test_window_negative_rejected(self): + """Negative window is rejected by ThresholdCheck validator.""" + with pytest.raises(ValidationError): + ThresholdCheck(eval=EvalType.WINDOW_GT, error=3, window=-100) + + def test_window_positive_accepted(self): + """Positive window is accepted.""" + check = ThresholdCheck(eval=EvalType.WINDOW_GT, error=3, window=3600) + assert check.window == 3600 + + def test_strikes_negative_rejected(self): + """Negative strikes is rejected by ThresholdCheck validator.""" + with pytest.raises(ValidationError): + ThresholdCheck(eval=EvalType.GT, error=3, strikes=-1) + + def test_strikes_zero_accepted(self): + """strikes=0 (unlimited recovery) is accepted.""" + check = ThresholdCheck(eval=EvalType.GT, error=3, strikes=0) + assert check.strikes == 0 + + def test_strikes_positive_accepted(self): + """Positive strikes is accepted.""" + check = ThresholdCheck(eval=EvalType.GT, error=3, strikes=2) + assert check.strikes == 2 + def test_model_dump_roundtrip(self): """model_dump produces a dict that can be re-validated.""" config = HealthagentConfig.model_validate({ diff --git a/tests/test_network.py b/tests/test_network.py new file mode 100644 index 0000000..1c1c47b --- /dev/null +++ b/tests/test_network.py @@ -0,0 +1,496 @@ +import pytest +from unittest.mock import patch +from healthagent.network import ( + NetworkHealthChecks, NetworkInterface, NetDevType, + IBDevice, IBPort, +) +from healthagent.config import NetworkConfig, ThresholdCheck, EvalType +from healthagent.reporter import Reporter, HealthStatus + + +def _make_ib_interface(name="ib0", state="4: ACTIVE", phys_state="5: LinkUp", + link_downed=0, link_error_recovery=0, operstate="up", + carrier_down_count=0): + """Helper: build a healthy IB NetworkInterface with one port.""" + ni = NetworkInterface() + ni.name = name + ni.type = NetDevType.INFINIBAND + ni.operstate = operstate + ni.carrier_down_count = carrier_down_count + ni.ib_device = IBDevice(name="mlx5_ib0", ports={ + "1": IBPort(state=state, phys_state=phys_state, rate="400 Gb/sec (4X NDR)", + link_downed=link_downed, link_error_recovery=link_error_recovery), + }) + return ni + + +def _make_eth_interface(name="eth0", operstate="up", carrier_down_count=0): + """Helper: build a healthy Ethernet NetworkInterface.""" + ni = NetworkInterface() + ni.name = name + ni.type = NetDevType.ETHERNET + ni.operstate = operstate + ni.carrier_down_count = carrier_down_count + return ni + + +def _default_config(): + """Helper: build a NetworkConfig matching the default checks from defaults.yaml.""" + return NetworkConfig( + infiniband={ + "state": ThresholdCheck(eval=EvalType.NE, error="4: ACTIVE", msg="IB link is not active"), + "phys_state": ThresholdCheck(eval=EvalType.NE, error="5: LinkUp", msg="IB link not in state LinkUp"), + "link_downed": ThresholdCheck(eval=EvalType.WINDOW_GT, error=3, window=10800, strikes=1, + msg="IB link flapped 3+ times within a 3-hour window"), + "link_error_recovery": ThresholdCheck(eval=EvalType.GT, warning=3, + msg="IB link error recovery exceeded the threshold"), + "operstate": ThresholdCheck(eval=EvalType.NE, warning="up", msg="IPoIB not in state up"), + "carrier_down_count": ThresholdCheck(eval=EvalType.GT, warning=5, + msg="IPoIB carrier down count exceeded threshold"), + }, + ethernet={ + "carrier_down_count": ThresholdCheck(eval=EvalType.GT, warning=3, + msg="carrier_down_count exceeded threshold"), + "operstate": ThresholdCheck(eval=EvalType.NE, error="up", msg="interface not in state up"), + }, + ) + + +def _make_checker(config: NetworkConfig = None): + """Helper: build a NetworkHealthChecks with a mocked reporter.""" + reporter = Reporter() + reporter.publish_cc = False + return NetworkHealthChecks(reporter=reporter, config=config or _default_config()) + + +# ── Healthy baselines ────────────────────────────────────── + +class TestHealthyBaseline: + + @pytest.mark.asyncio + async def test_healthy_ib_interface(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_ib_interface()] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + @pytest.mark.asyncio + async def test_healthy_eth_interface(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_eth_interface()] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + @pytest.mark.asyncio + async def test_no_interfaces(self): + checker = _make_checker() + checker.get_network_state = lambda: [] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + +# ── Simple threshold checks (ne, gt) ────────────────────── + +class TestSimpleThresholds: + + @pytest.mark.asyncio + async def test_ib_link_not_active_error(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_ib_interface(state="1: DOWN")] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + assert any("IB link is not active" in e for e in report.custom_fields["ib0"]["errors"]) + + @pytest.mark.asyncio + async def test_ib_phys_state_not_linkup(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_ib_interface(phys_state="2: Polling")] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + + @pytest.mark.asyncio + async def test_ib_operstate_down_warning(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_ib_interface(operstate="down")] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.WARNING + assert "warnings" in report.custom_fields["ib0"] + + @pytest.mark.asyncio + async def test_link_error_recovery_warning(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_ib_interface(link_error_recovery=5)] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.WARNING + + @pytest.mark.asyncio + async def test_eth_operstate_down_error(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_eth_interface(operstate="down")] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + + @pytest.mark.asyncio + async def test_eth_carrier_down_warning(self): + checker = _make_checker() + checker.get_network_state = lambda: [_make_eth_interface(carrier_down_count=5)] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.WARNING + + @pytest.mark.asyncio + async def test_error_takes_precedence_over_warning(self): + """Multiple issues: error should dominate.""" + checker = _make_checker() + checker.get_network_state = lambda: [ + _make_ib_interface(state="1: DOWN", operstate="down") + ] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + + @pytest.mark.asyncio + async def test_multiple_interfaces_independent(self): + """Error on one interface doesn't suppress OK on another.""" + checker = _make_checker() + checker.get_network_state = lambda: [ + _make_ib_interface(name="ib0", state="1: DOWN"), + _make_ib_interface(name="ib1"), + ] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + assert "errors" in report.custom_fields["ib0"] + assert "errors" not in report.custom_fields["ib1"] + + +# ── window_gt (link flapping) ───────────────────────────── + +class TestWindowGt: + + def _make_flap_config(self, error=3, window=180, strikes=0): + """Config with only link_downed as window_gt. Window=180s, maxlen=4.""" + return NetworkConfig(infiniband={ + "link_downed": ThresholdCheck( + eval=EvalType.WINDOW_GT, error=error, window=window, + strikes=strikes, msg="IB link flapped too many times", + ), + }) + + @pytest.mark.asyncio + async def test_no_trigger_insufficient_history(self): + """First few polls — not enough data to fill the window.""" + config = self._make_flap_config(window=180) + checker = _make_checker(config) + + # Simulate 2 polls at 60s intervals — timespan < window + for link_downed, t in [(0, 0.0), (1, 60.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + @pytest.mark.asyncio + async def test_timeseries_records_values(self): + """Verify link_downed values are recorded into the TimeSeries correctly.""" + config = self._make_flap_config(window=180, error=3) + checker = _make_checker(config) + + samples = [(0, 0.0), (1, 60.0), (2, 120.0)] + for link_downed, t in samples: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + key = ("ib0", "link_downed", "1") + assert key in checker._time_series + ts = checker._time_series[key] + assert len(ts) == 3 + # Verify recorded values and timestamps match what was polled + assert ts._samples[0] == (0, 0.0) + assert ts._samples[1] == (1, 60.0) + assert ts._samples[2] == (2, 120.0) + + @pytest.mark.asyncio + async def test_triggers_after_full_window(self): + """4 flaps within window → error.""" + config = self._make_flap_config(window=180, error=3) + checker = _make_checker(config) + + # 4 samples at 60s intervals, maxlen=4, all fit + samples = [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0)] + for link_downed, t in samples: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + assert any("flapped" in e for e in report.custom_fields["ib0"]["errors"]) + + @pytest.mark.asyncio + async def test_no_trigger_below_threshold(self): + """2 flaps in window — below threshold of 3.""" + config = self._make_flap_config(window=180, error=3) + checker = _make_checker(config) + + samples = [(0, 0.0), (1, 60.0), (2, 120.0), (2, 180.0)] + for link_downed, t in samples: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + @pytest.mark.asyncio + async def test_recovers_when_window_clears(self): + """Error clears once old events fall outside the window (strikes=0).""" + config = self._make_flap_config(window=180, error=3, strikes=0) + checker = _make_checker(config) + + # Trigger error: 4 flaps in 180s + for link_downed, t in [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + + # Time passes, no new flaps — window clears + for link_downed, t in [(4, 240.0), (4, 300.0), (4, 360.0), (4, 420.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + +# ── Strikes behavior ────────────────────────────────────── + +class TestStrikes: + + def _make_flap_config(self, strikes=1, window=180, error=3): + return NetworkConfig(infiniband={ + "link_downed": ThresholdCheck( + eval=EvalType.WINDOW_GT, error=error, window=window, + strikes=strikes, msg="IB link flapped too many times", + ), + }) + + @pytest.mark.asyncio + async def test_strikes_1_permanent_after_first_error(self): + """strikes=1: once error triggers, it persists even after window clears.""" + config = self._make_flap_config(strikes=1, window=180, error=3) + checker = _make_checker(config) + + # Trigger error + for link_downed, t in [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + + # Window clears — should still be ERROR (strikes exhausted) + for link_downed, t in [(4, 240.0), (4, 300.0), (4, 360.0), (4, 420.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + + @pytest.mark.asyncio + async def test_strikes_2_recovers_once_then_permanent(self): + """strikes=2: first error recoverable, second is permanent.""" + config = self._make_flap_config(strikes=2, window=180, error=3) + checker = _make_checker(config) + + # First error episode + for link_downed, t in [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + assert checker._trigger_count.get(("ib0", "link_downed", "1"), 0) == 1 + + # Window clears — should recover (only 1 strike, needs 2) + for link_downed, t in [(4, 240.0), (4, 300.0), (4, 360.0), (4, 420.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.OK + + # Second error episode + for link_downed, t in [(4, 420.0), (5, 480.0), (6, 540.0), (8, 600.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + assert checker._trigger_count.get(("ib0", "link_downed", "1"), 0) == 2 + + # Window clears again — should stay ERROR (2 strikes exhausted) + for link_downed, t in [(8, 660.0), (8, 720.0), (8, 780.0), (8, 840.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + + @pytest.mark.asyncio + async def test_strikes_counts_transitions_not_polls(self): + """Sustained error across multiple polls should count as 1 transition.""" + config = self._make_flap_config(strikes=2, window=180, error=3) + checker = _make_checker(config) + + # Trigger error and stay in error for several consecutive polls + for link_downed, t in [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0), + (5, 240.0), (6, 300.0), (7, 360.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + # Should still be only 1 transition despite many error polls + assert checker._trigger_count.get(("ib0", "link_downed", "1"), 0) == 1 + + @pytest.mark.asyncio + async def test_warning_to_error_counts_as_new_strike(self): + """Error recovering to warning (not OK) then back to error counts as 2 strikes.""" + config = NetworkConfig(infiniband={ + "link_downed": ThresholdCheck( + eval=EvalType.WINDOW_GT, error=3, warning=1, window=180, + strikes=2, msg="IB link flapped too many times", + ), + }) + checker = _make_checker(config) + + # First error episode: 4 flaps in window + for link_downed, t in [(0, 0.0), (1, 60.0), (2, 120.0), (4, 180.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + assert checker.reporter.store["NetworkInterfaceCheck"].status == HealthStatus.ERROR + assert checker._trigger_count.get(("ib0", "link_downed", "1"), 0) == 1 + + # Recover to warning (delta drops below error but still >= warning) + for link_downed, t in [(4, 240.0), (4, 300.0), (5, 360.0), (5, 420.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + # _in_error should be cleared since we're at warning, not error + assert checker._in_error.get(("ib0", "link_downed", "1")) is False + + # Second error episode + for link_downed, t in [(5, 420.0), (6, 480.0), (7, 540.0), (9, 600.0)]: + with patch("healthagent.util.time") as mock_time: + mock_time.monotonic.return_value = t + checker.get_network_state = lambda ld=link_downed: [_make_ib_interface(link_downed=ld)] + await checker.run_network_checks() + + # Should count as 2 strikes (warning→error is a new transition) + assert checker._trigger_count.get(("ib0", "link_downed", "1"), 0) == 2 + + +# ── Custom config overrides ─────────────────────────────── + +class TestConfigOverrides: + + @pytest.mark.asyncio + async def test_disabled_check_via_empty_config(self): + """Empty infiniband config means no checks run.""" + config = NetworkConfig(infiniband={}) + checker = _make_checker(config) + checker.get_network_state = lambda: [_make_ib_interface(state="1: DOWN")] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + @pytest.mark.asyncio + async def test_warning_instead_of_error(self): + """User downgrades link_downed to warning.""" + config = NetworkConfig(infiniband={ + "link_downed": ThresholdCheck( + eval=EvalType.GT, warning=3, + msg="link_downed exceeded threshold", + ), + }) + checker = _make_checker(config) + checker.get_network_state = lambda: [_make_ib_interface(link_downed=5)] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.WARNING + + @pytest.mark.asyncio + async def test_custom_threshold_value(self): + """User raises the error threshold.""" + config = NetworkConfig(infiniband={ + "link_downed": ThresholdCheck( + eval=EvalType.GT, error=10, + msg="link_downed exceeded threshold", + ), + }) + checker = _make_checker(config) + # 5 is below custom threshold of 10 + checker.get_network_state = lambda: [_make_ib_interface(link_downed=5)] + await checker.run_network_checks() + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.OK + + +# ── Multi-port ──────────────────────────────────────────── + +class TestMultiPort: + + @pytest.mark.asyncio + async def test_per_port_evaluation(self): + """Each port is evaluated independently.""" + ni = _make_ib_interface() + ni.ib_device.ports["2"] = IBPort( + state="4: ACTIVE", phys_state="5: LinkUp", + rate="400 Gb/sec (4X NDR)", link_downed=5, link_error_recovery=0, + ) + config = NetworkConfig(infiniband={ + "link_downed": ThresholdCheck(eval=EvalType.GT, error=3, msg="link_downed exceeded"), + }) + checker = _make_checker(config) + checker.get_network_state = lambda: [ni] + await checker.run_network_checks() + + report = checker.reporter.store["NetworkInterfaceCheck"] + assert report.status == HealthStatus.ERROR + errors = report.custom_fields["ib0"]["errors"] + # Only port 2 should have the error + assert any("port 2" in e for e in errors) + assert not any("port 1" in e for e in errors) diff --git a/tests/test_util.py b/tests/test_util.py index 7c67665..b18ab8f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,4 +1,4 @@ -from healthagent.util import evaluate, read_kernel_attrs +from healthagent.util import evaluate, read_kernel_attrs, TimeSeries from pathlib import Path import pytest @@ -269,6 +269,200 @@ def test_none_eval_type_raises(self): evaluate(None, 85, 80) +class TestTimeSeries: + + def test_record_and_len(self): + ts = TimeSeries(maxlen=10) + assert len(ts) == 0 + ts.record(5, timestamp=0.0) + ts.record(10, timestamp=1.0) + assert len(ts) == 2 + + def test_maxlen_evicts_oldest(self): + ts = TimeSeries(maxlen=3) + ts.record(1, timestamp=0.0) + ts.record(2, timestamp=1.0) + ts.record(3, timestamp=2.0) + ts.record(4, timestamp=3.0) + assert len(ts) == 3 + # Oldest sample (1, 0.0) should be evicted + assert ts._samples[0] == (2, 1.0) + + def test_delta_in_window_insufficient_samples(self): + ts = TimeSeries() + ts.record(5, timestamp=0.0) + delta, sufficient = ts.delta_in_window(100) + assert sufficient is False + assert delta == 0 + + def test_delta_in_window_insufficient_timespan(self): + ts = TimeSeries() + ts.record(0, timestamp=0.0) + ts.record(5, timestamp=50.0) + # Window is 100s but only 50s of data + delta, sufficient = ts.delta_in_window(100) + assert sufficient is False + assert delta == 0 + + def test_delta_in_window_exact_window(self): + ts = TimeSeries() + ts.record(10, timestamp=0.0) + ts.record(15, timestamp=100.0) + delta, sufficient = ts.delta_in_window(100) + assert sufficient is True + assert delta == 5 + + def test_delta_in_window_exceeds_window(self): + ts = TimeSeries() + ts.record(0, timestamp=0.0) + ts.record(3, timestamp=50.0) + ts.record(7, timestamp=150.0) + # Window=100, cutoff=50.0 — oldest in window is (3, 50.0) + delta, sufficient = ts.delta_in_window(100) + assert sufficient is True + assert delta == 4 # 7 - 3 + + def test_delta_in_window_all_samples_outside(self): + """If all samples are older than the cutoff except the latest, + the latest is both the cutoff match and the endpoint — delta=0.""" + ts = TimeSeries() + ts.record(0, timestamp=0.0) + ts.record(5, timestamp=1.0) + ts.record(100, timestamp=500.0) + # Window=10, cutoff=490 — only (100, 500) is in window + delta, sufficient = ts.delta_in_window(10) + assert sufficient is True + assert delta == 0 # 100 - 100 + + def test_delta_in_window_no_samples(self): + ts = TimeSeries() + delta, sufficient = ts.delta_in_window(100) + assert sufficient is False + assert delta == 0 + + def test_default_timestamp_uses_monotonic(self): + ts = TimeSeries() + ts.record(1) + ts.record(2) + assert len(ts) == 2 + # Timestamps should be monotonically increasing + assert ts._samples[1][1] >= ts._samples[0][1] + + def test_counter_reset_clamps_to_zero(self): + """Counter reset (value drops) should return delta=0, not negative.""" + ts = TimeSeries() + ts.record(50, timestamp=0.0) + ts.record(55, timestamp=50.0) + ts.record(0, timestamp=100.0) # counter reset + delta, sufficient = ts.delta_in_window(100) + assert sufficient is True + assert delta == 0 + + +class TestEvaluateWindowGt: + + def _build_ts(self, samples): + """Helper: build a TimeSeries from a list of (value, timestamp) pairs.""" + ts = TimeSeries() + for val, t in samples: + ts.record(val, timestamp=t) + return ts + + def test_no_samples_object(self): + triggered, val = evaluate("window_gt", 5, 3, window=100, samples=None) + assert triggered is False + assert val == 0 + + def test_insufficient_history(self): + ts = TimeSeries() + ts.record(5, timestamp=0.0) + # Only 1 sample, insufficient + triggered, delta = evaluate("window_gt", 5, 3, window=10800, samples=ts) + assert triggered is False + + def test_insufficient_timespan(self): + ts = self._build_ts([(0, 0.0), (2, 60.0)]) + # Window is 10800 but only 60s of data + triggered, delta = evaluate("window_gt", 2, 3, window=10800, samples=ts) + assert triggered is False + + def test_triggers_when_delta_exceeds_threshold(self): + """3 flaps in 3-hour window should trigger with threshold=3.""" + ts = self._build_ts([ + (0, 0.0), + (1, 1800.0), + (2, 5400.0), + (4, 10800.0), + ]) + # delta = 4 - 0 = 4 > 3 + triggered, delta = evaluate("window_gt", 4, 3, window=10800, samples=ts) + assert triggered is True + assert delta == 4 + + def test_does_not_trigger_below_threshold(self): + """2 flaps in 3-hour window should not trigger with threshold=3.""" + ts = self._build_ts([ + (0, 0.0), + (1, 3600.0), + (2, 10800.0), + ]) + # delta = 2 - 0 = 2, not > 3 + triggered, delta = evaluate("window_gt", 2, 3, window=10800, samples=ts) + assert triggered is False + assert delta == 2 + + def test_exact_threshold_triggered(self): + """Delta exactly equal to threshold should trigger (gte).""" + ts = self._build_ts([ + (0, 0.0), + (1, 3600.0), + (3, 10800.0), + ]) + triggered, delta = evaluate("window_gt", 3, 3, window=10800, samples=ts) + assert triggered is True + assert delta == 3 + + def test_windowed_delta_uses_cutoff(self): + """Events outside the window should not count toward the delta.""" + ts = self._build_ts([ + (0, 0.0), # outside window at t=10801 + (3, 5000.0), + (4, 8000.0), + (5, 10801.0), + ]) + # cutoff = 10801 - 10800 = 1.0 + # Oldest in window is (3, 5000.0), delta = 5 - 3 = 2 + triggered, delta = evaluate("window_gt", 5, 3, window=10800, samples=ts) + assert triggered is False + assert delta == 2 + + def test_evaluate_does_not_modify_timeseries(self): + """evaluate() should not record into the TimeSeries.""" + ts = self._build_ts([(0, 0.0), (1, 10800.0)]) + assert len(ts) == 2 + evaluate("window_gt", 42, 100, window=10800, samples=ts) + assert len(ts) == 2 + + def test_spread_over_6_hours(self): + """4 flaps spread over 6 hours — first window has 2, second has 3.""" + ts = self._build_ts([ + (0, 0.0), + (1, 3600.0), # 1hr + (2, 7200.0), # 2hr + (3, 10860.0), # 3hr+1min + ]) + # cutoff = 10860 - 10800 = 60 → oldest in window is (1, 3600), delta = 3-1=2 + triggered, delta = evaluate("window_gt", 3, 3, window=10800, samples=ts) + assert triggered is False + + # Add sample at t=14400 (4hr) + ts.record(4, timestamp=14400.0) + # cutoff = 14400 - 10800 = 3600 → oldest in window is (1, 3600), delta = 4-1=3 + triggered, delta = evaluate("window_gt", 4, 3, window=10800, samples=ts) + assert triggered is True + assert delta == 3 + + class TestReadKernelAttrs: """Tests for read_kernel_attrs using a fake sysfs-like directory tree.