1
1
import asyncio
2
2
import math
3
- import multiprocessing
4
- import multiprocessing .queues
5
3
import time
6
4
from collections .abc import AsyncGenerator , Iterable , Iterator
7
5
from concurrent .futures import ProcessPoolExecutor
6
+ from multiprocessing import Manager , Queue
7
+ from queue import Empty as QueueEmpty
8
8
from typing import (
9
9
Any ,
10
10
Generic ,
15
15
from loguru import logger
16
16
17
17
from guidellm .config import settings
18
+ from guidellm .request .session import RequestSession
18
19
from guidellm .scheduler .result import (
19
20
SchedulerRequestResult ,
20
21
SchedulerResult ,
21
22
SchedulerRunInfo ,
22
23
)
23
24
from guidellm .scheduler .strategy import SchedulingStrategy
24
- from guidellm .scheduler .types import RequestT , ResponseT
25
+ from guidellm .scheduler .types import (
26
+ MPQueues ,
27
+ RequestT ,
28
+ ResponseT ,
29
+ WorkerProcessRequestTime ,
30
+ WorkerProcessResult ,
31
+ )
25
32
from guidellm .scheduler .worker import (
26
33
RequestsWorker ,
27
- WorkerProcessRequest ,
28
- WorkerProcessResult ,
29
34
)
30
35
31
36
__all__ = ["Scheduler" ]
@@ -114,13 +119,13 @@ async def run(
114
119
raise ValueError (f"Invalid max_duration: { max_duration } " )
115
120
116
121
with (
117
- multiprocessing . Manager () as manager ,
122
+ Manager () as manager ,
118
123
ProcessPoolExecutor (
119
124
max_workers = scheduling_strategy .processes_limit
120
125
) as executor ,
121
126
):
122
127
requests_iter : Optional [Iterator [Any ]] = None
123
- futures , requests_queue , responses_queue = await self ._start_processes (
128
+ futures , queues = await self ._start_processes (
124
129
manager , executor , scheduling_strategy
125
130
)
126
131
run_info , requests_iter , times_iter = self ._run_setup (
@@ -149,13 +154,14 @@ async def run(
149
154
requests_iter = self ._add_requests (
150
155
requests_iter ,
151
156
times_iter ,
152
- requests_queue ,
157
+ queues .requests ,
158
+ queues .times ,
153
159
run_info ,
154
160
)
155
161
await asyncio .sleep (0 ) # enable requests to start
156
162
157
163
iter_result = self ._check_result_ready (
158
- responses_queue ,
164
+ queues . responses ,
159
165
run_info ,
160
166
)
161
167
if iter_result is not None :
@@ -171,7 +177,7 @@ async def run(
171
177
run_info = run_info ,
172
178
)
173
179
174
- await self ._stop_processes (futures , requests_queue )
180
+ await self ._stop_processes (futures , queues . requests )
175
181
176
182
async def _start_processes (
177
183
self ,
@@ -180,14 +186,16 @@ async def _start_processes(
180
186
scheduling_strategy : SchedulingStrategy ,
181
187
) -> tuple [
182
188
list [asyncio .Future ],
183
- multiprocessing .Queue ,
184
- multiprocessing .Queue ,
189
+ MPQueues [RequestT , ResponseT ],
185
190
]:
186
191
await self .worker .prepare_multiprocessing ()
187
- requests_queue = manager .Queue (
188
- maxsize = scheduling_strategy .queued_requests_limit
192
+ queues : MPQueues [RequestT , ResponseT ] = MPQueues (
193
+ requests = manager .Queue (
194
+ maxsize = scheduling_strategy .processing_requests_limit
195
+ ),
196
+ times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
197
+ responses = manager .Queue (),
189
198
)
190
- responses_queue = manager .Queue ()
191
199
192
200
num_processes = min (
193
201
scheduling_strategy .processes_limit ,
@@ -212,36 +220,20 @@ async def _start_processes(
212
220
futures = []
213
221
loop = asyncio .get_event_loop ()
214
222
for id_ , requests_limit in zip (process_ids , process_requests_limits ):
215
- if scheduling_strategy .processing_mode == "sync" :
216
- futures .append (
217
- loop .run_in_executor (
218
- executor ,
219
- self .worker .process_loop_synchronous ,
220
- requests_queue ,
221
- responses_queue ,
222
- id_ ,
223
- )
224
- )
225
- elif scheduling_strategy .processing_mode == "async" :
226
- futures .append (
227
- loop .run_in_executor (
228
- executor ,
229
- self .worker .process_loop_asynchronous ,
230
- requests_queue ,
231
- responses_queue ,
232
- requests_limit ,
233
- id_ ,
234
- )
235
- )
236
- else :
237
- raise ValueError (
238
- f"Invalid processing mode: { scheduling_strategy .processing_mode } "
239
- f"for strategy: { scheduling_strategy } "
223
+ futures .append (
224
+ loop .run_in_executor (
225
+ executor ,
226
+ self .worker .process_loop_asynchronous ,
227
+ queues ,
228
+ False , # TODO: Make configurable
229
+ requests_limit ,
230
+ id_ ,
240
231
)
232
+ )
241
233
242
234
await asyncio .sleep (0.1 ) # give time for processes to start
243
235
244
- return futures , requests_queue , responses_queue
236
+ return futures , queues
245
237
246
238
def _run_setup (
247
239
self ,
@@ -284,7 +276,8 @@ def _add_requests(
284
276
self ,
285
277
requests_iter : Optional [Iterator [Any ]],
286
278
times_iter : Iterator [float ],
287
- requests_queue : multiprocessing .Queue ,
279
+ requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
280
+ times_queue : Queue [WorkerProcessRequestTime ],
288
281
run_info : SchedulerRunInfo ,
289
282
) -> Optional [Iterator [Any ]]:
290
283
if requests_iter is not None :
@@ -298,23 +291,24 @@ def _add_requests(
298
291
if run_info .created_requests >= run_info .end_number :
299
292
raise StopIteration
300
293
301
- if (
302
- request_time := next (times_iter )
303
- ) >= run_info .end_time or time .time () >= run_info .end_time :
304
- raise StopIteration
305
-
306
- request = next (requests_iter )
307
- work_req : WorkerProcessRequest [RequestT ] = WorkerProcessRequest (
308
- request = request ,
309
- start_time = request_time ,
310
- timeout_time = run_info .end_time ,
311
- queued_time = time .time (),
312
- )
313
- requests_queue .put (work_req )
314
-
315
- run_info .created_requests += 1
316
- run_info .queued_requests += 1
317
- added_count += 1
294
+ session = next (requests_iter )
295
+ requests_queue .put (session )
296
+ for _ in range (len (session )):
297
+ if (
298
+ request_time := next (times_iter )
299
+ ) >= run_info .end_time or time .time () >= run_info .end_time :
300
+ raise StopIteration
301
+
302
+ work_req = WorkerProcessRequestTime (
303
+ start_time = request_time ,
304
+ timeout_time = run_info .end_time ,
305
+ queued_time = time .time (),
306
+ )
307
+ times_queue .put (work_req )
308
+
309
+ run_info .created_requests += 1
310
+ run_info .queued_requests += 1
311
+ added_count += 1
318
312
except StopIteration :
319
313
# we've reached the limit number, limit time, or exhausted the requests
320
314
# set to None to stop adding more and tell the loop no more requests
@@ -324,14 +318,14 @@ def _add_requests(
324
318
325
319
def _check_result_ready (
326
320
self ,
327
- responses_queue : multiprocessing . Queue ,
321
+ responses_queue : Queue [ WorkerProcessResult [ RequestT , ResponseT ]] ,
328
322
run_info : SchedulerRunInfo ,
329
323
) -> Optional [SchedulerRequestResult [RequestT , ResponseT ]]:
330
324
try :
331
325
process_response : WorkerProcessResult [RequestT , ResponseT ] = (
332
326
responses_queue .get_nowait ()
333
327
)
334
- except multiprocessing . queues . Empty : # type: ignore[attr-defined]
328
+ except QueueEmpty :
335
329
return None
336
330
337
331
if process_response .type_ == "request_scheduled" :
@@ -374,8 +368,9 @@ def _check_result_ready(
374
368
async def _stop_processes (
375
369
self ,
376
370
futures : list [asyncio .Future ],
377
- requests_queue : multiprocessing . Queue ,
371
+ requests_queue : Queue [ RequestSession [ RequestT , ResponseT ]] ,
378
372
):
373
+ # FIXME: Need new method for stopping workers
379
374
for _ in futures :
380
375
requests_queue .put (None )
381
376
0 commit comments