Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion healthagent/gpu.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import json
import sys
import logging
import os
import shutil
import tempfile
import time
from datetime import datetime, timezone
from healthagent import epilog,status,healthcheck,prolog
from healthagent.scheduler import Scheduler
Expand All @@ -19,6 +22,10 @@
# At 1s polling, 300 = 5-minute window for delta_gt rate computation.
MAX_KEEP_SAMPLES = 300

# Persistent XID history file — survives healthagent restarts.
_XID_HISTORY_DIR = os.path.join(os.getenv("HEALTHAGENT_DIR", "/opt/healthagent"), "run")
_XID_HISTORY_FILE = os.path.join(_XID_HISTORY_DIR, "xid_history.json")

# Field-specific enrichments: map field ID -> callable(raw_value, field_values) -> list[str].
# Applied *after* generic message formatting to decode field values into
# human-readable details. Independent of watch config (defaults or user overrides).
Expand Down Expand Up @@ -67,7 +74,7 @@ def setup(self):
self.dcgmHandle = None
self.watch_fields = []
self.gpu_config = []
self.xid_history = {}
self.xid_history = self._load_xid_history()
if self.test_mode:
log.info("Running GPU tests in DCGM_TEST_MODE")
self.dcgmGroup, self.dcgmHandle = Wrap.connect(grp_name="healthagent_group", test_mode=self.test_mode)
Expand Down Expand Up @@ -107,6 +114,67 @@ def setup_background_watches(self):
# Seed the persistent sample collection (first call with None returns latest values)
self._field_collection = self.dcgmGroup.samples.GetAllSinceLastCall_v2(None, self.field_group)

@staticmethod
def _boot_time() -> float:
"""Return wall-clock timestamp of last boot (seconds since epoch)."""
with open('/proc/uptime') as f:
uptime_seconds = float(f.read().split()[0])
return time.time() - uptime_seconds

@staticmethod
def _load_xid_history() -> dict:
"""Load XID history from JSON. Discard the file entirely if it predates last boot."""
try:
boot_ts = GpuHealthChecks._boot_time()
if os.path.getmtime(_XID_HISTORY_FILE) < boot_ts:
log.debug("XID history file predates last boot, discarding")
os.remove(_XID_HISTORY_FILE)
return {}
with open(_XID_HISTORY_FILE, 'r') as f:
history = json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
log.debug(f"No previous XID history loaded: {e}")
return {}
Comment thread
aditigaur4 marked this conversation as resolved.

# Convert JSON string keys back to int
try:
restored = {
gpu_id: {int(k): v for k, v in xids.items()}
for gpu_id, xids in history.items()
}
except (ValueError, AttributeError, TypeError) as e:
log.warning(f"Corrupt XID history file, discarding: {e}")
return {}
count = sum(len(v) for v in restored.values())
if count:
log.info(f"Restored {count} XIDs from previous session")
return restored

async def _save_xid_history(self):
"""Persist XID history to JSON. Scheduled via Scheduler.add_task (fire-and-forget)."""
data = self.xid_history
# Convert int keys to strings for JSON compatibility
serializable = {
gpu_id: {str(k): v for k, v in xids.items()}
for gpu_id, xids in data.items()
}
try:
await asyncio.to_thread(self._write_xid_file, serializable)
except Exception as e:
Comment thread
Copilot marked this conversation as resolved.
log.warning(f"Failed to persist XID history: {e}")

@staticmethod
def _write_xid_file(data: dict):
"""Write XID data atomically via unique temp file + rename."""
fd, tmp = tempfile.mkstemp(dir=_XID_HISTORY_DIR, suffix='.tmp')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump(data, f)
os.replace(tmp, _XID_HISTORY_FILE)
except BaseException:
os.unlink(tmp)
raise

def __display_gpu_config(self):

## Invoke method to get gpu IDs of the members of the newly-created group
Expand Down Expand Up @@ -179,6 +247,7 @@ async def handle_policy_violation(self, callbackresp):
self.xid_history[gpu_key] = {}
if xid_received not in self.xid_history[gpu_key] or timestamp < self.xid_history[gpu_key][xid_received]["timestamp"]:
self.xid_history[gpu_key][xid_received] = {"xid": xid_received, "timestamp": timestamp}
Scheduler.add_task(self._save_xid_history)
except ValueError as e:
log.exception(e)
return
Expand All @@ -195,6 +264,7 @@ def track_fieldsv2(self):
XIDs are handled separately — GPU-only, not part of field watches.
"""
custom_fields = {'error_count': 0, 'warning_count': 0, 'category': set()}
xid_changed = False
try:
# Accumulate new samples since last call
self._field_collection = self.dcgmGroup.samples.GetAllSinceLastCall_v2(
Expand Down Expand Up @@ -278,6 +348,7 @@ def track_fieldsv2(self):
ts_utc = datetime.fromtimestamp(sample.ts / 1_000_000, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S UTC")
if xid_num not in self.xid_history[gpu_id] or ts_utc < self.xid_history[gpu_id][xid_num]["timestamp"]:
self.xid_history[gpu_id][xid_num] = {"xid": xid_num, "timestamp": ts_utc}
xid_changed = True

# Populate report with full XID history
for gpu_id, xids in self.xid_history.items():
Expand All @@ -302,6 +373,8 @@ def track_fieldsv2(self):
if xids:
custom_fields['category'].add("XID")

if xid_changed:
Scheduler.add_task(self._save_xid_history)
return custom_fields
except Exception as e:
log.exception(e)
Expand Down