Skip to content

Commit ec37be8

Browse files
committed
Use semaphore to synchronize threads
Using `threading.Semaphore` makes it easier to cap the number of concurrently ran tasks. It also makes it possible to remove busy wait in child thread by waiting for semaphore. Also I've updated code to use the backpressure pattern - the new tasks are scheduled as soon as the user consumes the old ones.
1 parent 2115c24 commit ec37be8

File tree

1 file changed

+49
-63
lines changed

1 file changed

+49
-63
lines changed

Lib/multiprocessing/pool.py

Lines changed: 49 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#
1515

1616
import collections
17+
import functools
1718
import itertools
1819
import os
1920
import queue
@@ -395,32 +396,20 @@ def _guarded_task_generation(self, result_job, func, iterable):
395396
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
396397

397398
def _guarded_task_generation_lazy(self, result_job, func, iterable,
398-
lazy_task_gen_helper):
399-
'''Provides a generator of tasks for imap and imap_unordered with
399+
backpressure_sema):
400+
"""Provides a generator of tasks for imap and imap_unordered with
400401
appropriate handling for iterables which throw exceptions during
401-
iteration.'''
402-
if not lazy_task_gen_helper.feature_enabled:
403-
yield from self._guarded_task_generation(result_job, func, iterable)
404-
return
405-
402+
iteration."""
406403
try:
407404
i = -1
408405
enumerated_iter = iter(enumerate(iterable))
409-
thread = threading.current_thread()
410-
max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize
411-
412-
while thread._state == RUN:
413-
with lazy_task_gen_helper.iterator_cond:
414-
if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks:
415-
continue # wait for some task to be (picked up and) finished
416-
406+
while True:
407+
backpressure_sema.acquire()
417408
try:
418-
i, x = enumerated_iter.__next__()
409+
i, x = next(enumerated_iter)
419410
except StopIteration:
420411
break
421-
422412
yield (result_job, i, func, (x,), {})
423-
lazy_task_gen_helper.tasks_generated += 1
424413

425414
except Exception as e:
426415
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
@@ -430,31 +419,32 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
430419
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
431420
'''
432421
self._check_running()
422+
if chunksize < 1:
423+
raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize))
424+
425+
result = IMapIterator(self, buffersize)
426+
427+
if result._backpressure_sema is None:
428+
task_generation = self._guarded_task_generation
429+
else:
430+
task_generation = functools.partial(
431+
self._guarded_task_generation_lazy,
432+
backpressure_sema=result._backpressure_sema,
433+
)
434+
433435
if chunksize == 1:
434-
result = IMapIterator(self, buffersize)
435436
self._taskqueue.put(
436437
(
437-
self._guarded_task_generation_lazy(result._job,
438-
func,
439-
iterable,
440-
result._lazy_task_gen_helper),
438+
task_generation(result._job, func, iterable),
441439
result._set_length,
442440
)
443441
)
444442
return result
445443
else:
446-
if chunksize < 1:
447-
raise ValueError(
448-
"Chunksize must be 1+, not {0:n}".format(
449-
chunksize))
450444
task_batches = Pool._get_tasks(func, iterable, chunksize)
451-
result = IMapIterator(self, buffersize)
452445
self._taskqueue.put(
453446
(
454-
self._guarded_task_generation_lazy(result._job,
455-
mapstar,
456-
task_batches,
457-
result._lazy_task_gen_helper),
447+
task_generation(result._job, mapstar, task_batches),
458448
result._set_length,
459449
)
460450
)
@@ -465,30 +455,34 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
465455
Like `imap()` method but ordering of results is arbitrary.
466456
'''
467457
self._check_running()
458+
if chunksize < 1:
459+
raise ValueError(
460+
"Chunksize must be 1+, not {0!r}".format(chunksize)
461+
)
462+
463+
result = IMapUnorderedIterator(self, buffersize)
464+
465+
if result._backpressure_sema is None:
466+
task_generation = self._guarded_task_generation
467+
else:
468+
task_generation = functools.partial(
469+
self._guarded_task_generation_lazy,
470+
backpressure_sema=result._backpressure_sema,
471+
)
472+
468473
if chunksize == 1:
469-
result = IMapUnorderedIterator(self, buffersize)
470474
self._taskqueue.put(
471475
(
472-
self._guarded_task_generation_lazy(result._job,
473-
func,
474-
iterable,
475-
result._lazy_task_gen_helper),
476+
task_generation(result._job, func, iterable),
476477
result._set_length,
477478
)
478479
)
479480
return result
480481
else:
481-
if chunksize < 1:
482-
raise ValueError(
483-
"Chunksize must be 1+, not {0!r}".format(chunksize))
484482
task_batches = Pool._get_tasks(func, iterable, chunksize)
485-
result = IMapUnorderedIterator(self, buffersize)
486483
self._taskqueue.put(
487484
(
488-
self._guarded_task_generation_lazy(result._job,
489-
mapstar,
490-
task_batches,
491-
result._lazy_task_gen_helper),
485+
task_generation(result._job, mapstar, task_batches),
492486
result._set_length,
493487
)
494488
)
@@ -889,7 +883,13 @@ def __init__(self, pool, buffersize):
889883
self._length = None
890884
self._unsorted = {}
891885
self._cache[self._job] = self
892-
self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond)
886+
887+
if buffersize is None:
888+
self._backpressure_sema = None
889+
else:
890+
self._backpressure_sema = threading.Semaphore(
891+
value=self._pool._processes + buffersize
892+
)
893893

894894
def __iter__(self):
895895
return self
@@ -910,7 +910,9 @@ def next(self, timeout=None):
910910
self._pool = None
911911
raise StopIteration from None
912912
raise TimeoutError from None
913-
self._lazy_task_gen_helper.tasks_finished += 1
913+
914+
if self._backpressure_sema:
915+
self._backpressure_sema.release()
914916

915917
success, value = item
916918
if success:
@@ -959,22 +961,6 @@ def _set(self, i, obj):
959961
del self._cache[self._job]
960962
self._pool = None
961963

962-
#
963-
# Class to store stats for lazy task generation and share them
964-
# between the main thread and `_guarded_task_generation()` thread.
965-
#
966-
class _LazyTaskGenHelper(object):
967-
def __init__(self, buffersize, iterator_cond):
968-
self.feature_enabled = buffersize is not None
969-
self.buffersize = buffersize
970-
self.tasks_generated = 0
971-
self.tasks_finished = 0
972-
self.iterator_cond = iterator_cond
973-
974-
@property
975-
def not_finished_tasks(self):
976-
return self.tasks_generated - self.tasks_finished
977-
978964
#
979965
#
980966
#

0 commit comments

Comments
 (0)