22# SPDX-License-Identifier: Apache-2.0
33
44import asyncio
5- import inspect
65import time
76from collections .abc import Awaitable , Callable
87
@@ -36,21 +35,20 @@ class TelemetryDataCollector(AIPerfLifecycleMixin):
3635 Args:
3736 dcgm_url: URL of the DCGM metrics endpoint (e.g., "http://localhost:9400/metrics")
3837 collection_interval: Interval in seconds between metric collections (default: 1.0)
39- record_callback: Optional async/sync callback to receive collected records.
40- Signature: (records: list[TelemetryRecord], collector_id: str) -> None
41- error_callback: Optional async/sync callback to receive collection errors.
42- Signature: (error: ErrorDetails, collector_id: str) -> None
38+ record_callback: Optional async callback to receive collected records.
39+ Signature: async (records: list[TelemetryRecord], collector_id: str) -> None
40+ error_callback: Optional async callback to receive collection errors.
41+ Signature: async (error: ErrorDetails, collector_id: str) -> None
4342 collector_id: Unique identifier for this collector instance
4443 """
4544
4645 def __init__ (
4746 self ,
4847 dcgm_url : str ,
4948 collection_interval : float | None = None ,
50- record_callback : Callable [[list [TelemetryRecord ], str ], Awaitable [None ] | None ]
51- | None = None ,
52- error_callback : Callable [[ErrorDetails , str ], Awaitable [None ] | None ]
49+ record_callback : Callable [[list [TelemetryRecord ], str ], Awaitable [None ]]
5350 | None = None ,
51+ error_callback : Callable [[ErrorDetails , str ], Awaitable [None ]] | None = None ,
5452 collector_id : str = "telemetry_collector" ,
5553 ) -> None :
5654 self ._dcgm_url = dcgm_url
@@ -155,9 +153,7 @@ async def _collect_telemetry_task(self) -> None:
155153 except Exception as e :
156154 if self ._error_callback :
157155 try :
158- res = self ._error_callback (ErrorDetails .from_exception (e ), self .id )
159- if inspect .isawaitable (res ):
160- await res
156+ await self ._error_callback (ErrorDetails .from_exception (e ), self .id )
161157 except Exception as callback_error :
162158 self .error (f"Failed to send error via callback: { callback_error } " )
163159 else :
@@ -182,9 +178,7 @@ async def _collect_and_process_metrics(self) -> None:
182178
183179 if records and self ._record_callback :
184180 try :
185- res = self ._record_callback (records , self .id )
186- if inspect .isawaitable (res ):
187- await res
181+ await self ._record_callback (records , self .id )
188182 except Exception as e :
189183 self .warning (f"Failed to send telemetry records via callback: { e } " )
190184
0 commit comments