Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 25 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,60 +314,51 @@ Virtual interfaces (such as bridges, VLAN interfaces, and container networks) ar
**Monitoring Behavior:**

- **Periodic Checks**: Runs every 60 seconds to sample network interface state
- **Sliding Window Analysis**: Tracks link down events over a 60-sample window to calculate link flap rates
- **Windowed Analysis**: Uses `window_gt` evaluation to track link flap events over a configurable time window (default: 3 hours for IB `link_downed`)
- **Physical Interfaces Only**: Automatically filters out virtual interfaces by checking sysfs device paths
- **Auto-Clearing**: Automatically clears error reports when interfaces return to healthy operational state
- **Strikes-Based Recovery**: The `strikes` parameter controls how many times a check can recover from error before the node is permanently degraded. With `strikes: 1` (default for `link_downed`), the first error is permanent. Set `strikes: 0` to allow unlimited recovery.

**Alert Conditions:**

The network monitor triggers alerts based on two conditions:
The network monitor evaluates each configured check per interface. Default InfiniBand checks include:

1. **Link Flapping (WARNING)**: When an interface goes down 1 or more times per hour
- Status: `Warning`
- Indicates unstable network connectivity that may impact workloads
- Includes `link_down_rate_per_hour` metric in custom fields

2. **Interface Not Operational (ERROR)**: When an interface is not in the `up` operational state
1. **Link Flapping (ERROR)**: When `link_downed` increases by 3 or more within a 3-hour window
- Status: `Error`
- Indicates a critical network failure or misconfiguration
- Reports the current operational state (down, lowerlayerdown, etc.)
- Default: `eval: window_gt`, `error: 3`, `window: 10800`, `strikes: 1`
- Permanently degrades the node after the first occurrence (strikes=1)

2. **Link State (ERROR)**: When IB link state is not `4: ACTIVE` or physical state is not `5: LinkUp`

3. **Link Error Recovery (WARNING)**: When `link_error_recovery` count exceeds 3

4. **IPoIB State (WARNING)**: When IPoIB `operstate` is not `up`

5. **Carrier Down (WARNING)**: When `carrier_down_count` exceeds threshold (5 for IB, 3 for Ethernet)

6. **Ethernet Operstate (ERROR)**: When Ethernet interface is not in the `up` operational state

**Response to Network Issues:**

When network problems are detected, healthagent:
1. Sets appropriate health status (`WARNING` for flapping, `ERROR` for down interfaces)
1. Sets appropriate health status (`WARNING` or `ERROR`) based on configured thresholds
2. Creates a detailed report with:
- List of affected interfaces
- Specific error conditions for each interface
- Link flap rates and state transition counts
- Current carrier and operational states
3. Includes per-interface custom fields:
- `link_down_rate_per_hour`: Recent link failure rate
- `link_flap_since_uptime`: Total link state transitions since boot
- `error_count`: Number of errors for the interface
- `carrier`: Physical link status
4. Reports the issue to CycleCloud (if enabled)
- Specific error conditions for each interface and port
- Current link and operational states
3. Reports the issue to CycleCloud (if enabled)

Example network interface alert:
```json
{
"network": {
"Network": {
"NetworkInterfaceCheck": {
"status": "Error",
"message": "Network reports errors",
"description": "Network interfaces eth0,ib0 are not operational",
"details": "Network interface eth0 is not operational and in state down.\nNetwork interface ib0 went down 3 times in the last hour",
"description": "Network interfaces report errors",
"details": "ERROR: ib0 - port 1: IB link flapped 3+ times within a 3-hour window",
"last_update": "2025-08-07T19:43:50 UTC",
"eth0": {
"link_down_rate_per_hour": 0,
"link_flap_since_uptime": 2,
"error_count": 1,
"carrier": "0"
},
"ib0": {
"link_down_rate_per_hour": 3,
"link_flap_since_uptime": 15,
"error_count": 0
"errors": ["port 1: IB link flapped 3+ times within a 3-hour window"],
"warnings": []
}
}
}
Expand Down
19 changes: 18 additions & 1 deletion healthagent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union

import yaml
from pydantic import BaseModel
from pydantic import BaseModel, field_validator


log = logging.getLogger(__name__)
Expand All @@ -26,6 +26,7 @@ class EvalType(StrEnum):
IN = "in"
BITMASK = "bitmask"
DELTA_GT = "delta_gt"
WINDOW_GT = "window_gt"
Comment thread
aditigaur4 marked this conversation as resolved.


class ModuleName(StrEnum):
Expand All @@ -43,6 +44,22 @@ class ThresholdCheck(BaseModel, extra="forbid"):
category: str | None = None
warning: Union[int, float, str, list] | None = None
error: Union[int, float, str, list] | None = None
window: int | None = None
strikes: int = 0

@field_validator('window')
@classmethod
def window_must_be_positive(cls, v):
Comment thread
aditigaur4 marked this conversation as resolved.
if v is not None and v <= 0:
raise ValueError('window must be > 0')
return v

@field_validator('strikes')
@classmethod
def strikes_must_be_non_negative(cls, v):
if v < 0:
raise ValueError('strikes must be >= 0')
Comment thread
aditigaur4 marked this conversation as resolved.
return v


class ModuleConfig(BaseModel):
Expand Down
8 changes: 5 additions & 3 deletions healthagent/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ network:
error: "5: LinkUp"
msg: "IB link not in state LinkUp"
link_downed:
eval: gt
eval: window_gt
window: 10800
error: 3
Comment thread
aditigaur4 marked this conversation as resolved.
msg: "IB link down count exceeded the threshold"
strikes: 1
msg: "IB link flapped 3+ times within a 3-hour window"
Comment thread
aditigaur4 marked this conversation as resolved.
link_error_recovery:
eval: gt
warning: 3
Expand Down Expand Up @@ -58,7 +60,7 @@ gpu:
# By default all XIDS are treated as error.
# XIDS in warning list are downgraded to warning.
# If error list is explicitly specified, then only
# specific XIDS will be treated as ERROR.
# specified XIDS will be treated as ERROR.
xid:
warning: [43, 63, 13, 31, 66, 94, 154]
ignore: []
Expand Down
71 changes: 48 additions & 23 deletions healthagent/network.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import math
from enum import Enum
from pathlib import Path
from dataclasses import dataclass, fields
from healthagent import healthcheck
from healthagent.util import read_kernel_attrs, evaluate
from healthagent.util import read_kernel_attrs, evaluate, TimeSeries
from healthagent.healthmodule import HealthModule
from healthagent.config import NetworkConfig
from healthagent.config import NetworkConfig, ThresholdCheck
from healthagent.reporter import Reporter, HealthReport, HealthStatus
from healthagent.scheduler import Scheduler

Expand All @@ -20,21 +21,6 @@ class NetDevType(Enum):
INFINIBAND = 32
UNKNOWN = -1

_NET_CONFIG = {
"infiniband": {
"state": {"eval": "ne", "error": "4: ACTIVE", "msg": "IB link is not active"},
"phys_state": {"eval": "ne", "error": "5: LinkUp", "msg": "IB link not in state LinkUp"},
"link_downed": {"eval": "gt", "error": 3, "msg": "IB link down count exceeded the threshold"},
"link_error_recovery": {"eval": "gt", "warning": 3, "msg": "IB link error recovery exceeded the threshold"},
"operstate": {"eval": "ne", "warning": "up", "msg": "IPoIB not in state up"},
"carrier_down_count": {"eval": "gt", "warning": 5, "msg": "IPoIB carrier down count exceeded threshold"},
},
"ethernet": {
"carrier_down_count": {"eval": "gt", "warning": 3, "msg": "carrier_down_count exceeded threshold"},
"operstate": {"eval": "ne", "error": "up", "msg": "interface not in state up"},
},
}


@dataclass
class IBPort:
Expand Down Expand Up @@ -65,6 +51,10 @@ class NetworkHealthChecks(HealthModule):

def __init__(self, reporter: Reporter, config: 'NetworkConfig | None' = None):
super().__init__(reporter, config or NetworkConfig())
self.config: NetworkConfig = self.config
self._time_series = {} # {key: TimeSeries} — windowed sample buffers
self._in_error = {} # {key: bool} — currently in error state?
self._trigger_count = {} # {key: int} — OK→ERROR transition count
Comment thread
aditigaur4 marked this conversation as resolved.

async def create(self):
await self.reporter.clear_all_errors()
Expand Down Expand Up @@ -131,14 +121,41 @@ async def run_network_checks(self):
# Derive port-level field names from the IBPort dataclass
_ib_port_fields = {f.name for f in fields(IBPort)}

def check_field(iface_name, field, check, value, port_num=None):
def check_field(iface_name, field, check: 'ThresholdCheck', value, port_num=None):
key = (iface_name, field, port_num)
max_strikes = check.strikes

# Permanently degraded — re-report error without re-evaluating
if max_strikes > 0 and self._trigger_count.get(key, 0) >= max_strikes:
msg = check.msg or f"{field} threshold exceeded"
if port_num is not None:
msg = f"port {port_num}: {msg}"
custom_fields.setdefault(iface_name, {}).setdefault("errors", []).append(msg)
details.append(f"ERROR: {iface_name} - {msg}")
report.escalate(HealthStatus.ERROR)
return

eval_type = check.eval
eval_kwargs = {}
if eval_type == "window_gt":
window = check.window or 3600
if key not in self._time_series:
self._time_series[key] = TimeSeries(maxlen=math.ceil(window / 60) + 1)
self._time_series[key].record(value)
eval_kwargs["samples"] = self._time_series[key]
eval_kwargs["window"] = window

triggered = False
triggered_level = None
for level in ("error", "warning"):
thresh = check.get(level)
thresh = getattr(check, level, None)
if thresh is None:
continue
triggered, _ = evaluate(check["eval"], value, thresh)
if triggered:
msg = check.get("msg", f"{field}={value} (threshold: {level} {check['eval']} {thresh})")
hit, _ = evaluate(eval_type, value, thresh, **eval_kwargs)
if hit:
triggered = True
triggered_level = level
msg = check.msg or f"{field}={value} (threshold: {level} {eval_type} {thresh})"
if port_num is not None:
msg = f"port {port_num}: {msg}"
if level == "error":
Expand All @@ -151,12 +168,20 @@ def check_field(iface_name, field, check, value, port_num=None):
report.escalate(HealthStatus.WARNING)
break # error takes precedence over warning

# Strike tracking: count OK→ERROR transitions
if triggered and triggered_level == "error" and max_strikes > 0:
if not self._in_error.get(key):
self._in_error[key] = True
self._trigger_count[key] = self._trigger_count.get(key, 0) + 1
elif triggered_level != "error":
self._in_error[key] = False

for ni in interfaces:

custom_fields[ni.name] = {}
# Select the right config section based on interface type
config_key = "infiniband" if ni.type == NetDevType.INFINIBAND else "ethernet"
checks = _NET_CONFIG.get(config_key, {})
checks: dict[str, ThresholdCheck] = getattr(self.config, config_key, {})

# Evaluate each configured check
for field, check in checks.items():
Expand Down
69 changes: 66 additions & 3 deletions healthagent/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import bisect
import operator
import os
import time
from collections import deque
from pathlib import Path


Expand Down Expand Up @@ -128,22 +131,75 @@ def _read_top_level(root: str) -> dict:
return result


class TimeSeries:
"""Ring buffer of timestamped samples for windowed evaluation.

Modules create and manage their own instances. Pass to evaluate()
via the samples parameter for window_gt evaluation.

Args:
maxlen: Maximum number of samples to retain. When exceeded,
the oldest sample is discarded. Modules should size this
based on window / poll_interval (e.g. 10800/60 + 1 = 181).
"""

def __init__(self, maxlen=None):
self._samples = deque(maxlen=maxlen)

def record(self, value, timestamp=None):
"""Append a sample. Timestamp defaults to time.monotonic()."""
if timestamp is None:
timestamp = time.monotonic()
self._samples.append((value, timestamp))

def delta_in_window(self, window_seconds):
"""Compute value delta within the time window.

Returns (delta: int|float, sufficient: bool).
sufficient is False if the recorded time span is less than
window_seconds (not enough history to evaluate).
"""
if len(self._samples) < 2:
return 0, False

latest_val, latest_ts = self._samples[-1]
oldest_val, oldest_ts = self._samples[0]

if (latest_ts - oldest_ts) < window_seconds:
return 0, False

cutoff = latest_ts - window_seconds
# Binary search for the oldest sample at or after the cutoff
timestamps = [s[1] for s in self._samples]
idx = bisect.bisect_left(timestamps, cutoff)
if idx < len(self._samples):
delta = latest_val - self._samples[idx][0]
return max(0, delta), True

return 0, False

def __len__(self):
return len(self._samples)


def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None,
current_time=None, window=60):
current_time=None, window=60, samples: TimeSeries = None):
"""Unified threshold evaluation. Returns (triggered: bool, evaluated_value).

For delta_gt, evaluated_value is the computed rate per window.
For window_gt, evaluated_value is the delta within the time window.
For bitmask, evaluated_value is the matching bits (value & threshold).
For all others, evaluated_value is the input value.

Args:
eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, delta_gt)
eval_type: Comparison type (gt, lt, ge, le, eq, ne, in, bitmask, delta_gt, window_gt)
value: Current value to evaluate
threshold: Threshold to compare against (list for 'in' eval type)
prev_value: Previous sample value (delta_gt only)
prev_time: Previous sample timestamp in monotonic seconds (delta_gt only)
current_time: Current sample timestamp in monotonic seconds (delta_gt only)
window: Time window in seconds for rate normalization (delta_gt only, default: 60)
window: Time window in seconds (delta_gt: rate normalization, window_gt: sliding window size)
samples: TimeSeries instance for recording and windowed evaluation (window_gt only)
"""
eval_type = str(eval_type).strip().lower()

Expand Down Expand Up @@ -175,4 +231,11 @@ def evaluate(eval_type, value, threshold, *, prev_value=None, prev_time=None,
return False, 0.0
rate = (delta * window) / elapsed
return rate > threshold, rate
elif eval_type == "window_gt":
if samples is None:
return False, 0
delta, sufficient = samples.delta_in_window(window)
if not sufficient:
return False, delta
return delta >= threshold, delta
raise ValueError(f"Unknown eval_type: {eval_type!r}")
Loading