Skip to content

Commit a39ea71

Browse files
committed
fix tests
Signed-off-by: Superjomn <[email protected]>
1 parent e27f813 commit a39ea71

File tree

2 files changed

+13
-13
lines changed

2 files changed

+13
-13
lines changed

tensorrt_llm/executor/rpc/rpc_server.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,10 @@ async def _process_streaming_request(self, req: RPCRequest) -> None:
529529
async def stream_with_timeout():
530530
nonlocal chunk_index
531531
async for result in func(*req.args, **req.kwargs):
532-
if not result:
533-
continue # WAR
532+
if result is None or result == []:
533+
# Skip None values or empty list to save bandwidth
534+
# TODO[Superjomn]: add a flag to control this behavior
535+
continue
534536
# Check if shutdown was triggered
535537
if self._shutdown_event.is_set():
536538
raise RPCCancelled(
@@ -559,8 +561,8 @@ async def stream_with_timeout():
559561
else:
560562
# No timeout specified, stream normally
561563
async for result in func(*req.args, **req.kwargs):
562-
if not result:
563-
continue # WAR
564+
if result is None or result == []:
565+
continue # Skip None values or empty list
564566
# Check if shutdown was triggered
565567
if self._shutdown_event.is_set():
566568
raise RPCCancelled(

tensorrt_llm/executor/rpc_worker.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
from concurrent.futures import ThreadPoolExecutor
32
from pathlib import Path
43
from queue import Queue
54
from threading import Event
@@ -88,9 +87,9 @@ def __init__(
8887
self._response_queue = Queue()
8988
self.set_result_queue(self._response_queue)
9089

91-
# Create a thread pool for the fetch_responses_loop_async task to avoid
92-
# being interfered by other tasks such as submit().
93-
self._fetch_responses_loop_executor = ThreadPoolExecutor(max_workers=1)
90+
# Note: We don't create a persistent ThreadPoolExecutor anymore
91+
# to avoid thread leaks. Instead, we use asyncio.to_thread() which
92+
# manages threads internally.
9493

9594
def submit(self, request: GenerationRequest):
9695
""" Submits a request to the worker. """
@@ -128,11 +127,10 @@ def fetch_responses(self, timeout: Optional[float] = None) -> list:
128127

129128
async def fetch_responses_async(self,
130129
timeout: Optional[float] = None) -> list:
131-
# First, await any pending responses without blocking the event loop
132-
loop = asyncio.get_event_loop()
133-
responses = await loop.run_in_executor(
134-
self._fetch_responses_loop_executor,
135-
lambda: self.fetch_responses(timeout=timeout))
130+
# Use asyncio.to_thread to avoid blocking the event loop
131+
# This is similar to fetch_stats_async and fetch_kv_cache_events_async
132+
responses = await asyncio.to_thread(self.fetch_responses,
133+
timeout=timeout)
136134
return responses
137135

138136
async def fetch_stats_async(self, timeout: Optional[float] = None) -> list:

0 commit comments

Comments
 (0)