Skip to content

gh-64192: Make imap()/imap_unordered() in multiprocessing.pool actually lazy #136871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 94 additions & 40 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#

import collections
import functools
import itertools
import os
import queue
Expand Down Expand Up @@ -190,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
# The _taskqueue_buffersize_semaphores exist to allow calling .release()
# on every active semaphore when the pool is terminating to let task_handler
# wake up to stop. It's a dict so that each iterator object can efficiently
# deregister its semaphore when iterator finishes.
self._taskqueue_buffersize_semaphores = {}
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
Expand Down Expand Up @@ -256,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
self._result_handler, self._cache,
self._taskqueue_buffersize_semaphores),
exitpriority=15
)
self._state = RUN
Expand Down Expand Up @@ -382,72 +389,99 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)

def _guarded_task_generation(self, result_job, func, iterable):
def _guarded_task_generation(self, result_job, func, iterable,
buffersize_sema=None):
'''Provides a generator of tasks for imap and imap_unordered with
appropriate handling for iterables which throw exceptions during
iteration.'''
try:
i = -1
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

if buffersize_sema is None:
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})

else:
enumerated_iter = iter(enumerate(iterable))
while True:
buffersize_sema.acquire()
try:
i, x = next(enumerated_iter)
except StopIteration:
break
yield (result_job, i, func, (x,), {})

except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})

def imap(self, func, iterable, chunksize=1):
def imap(self, func, iterable, chunksize=1, buffersize=None):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
self._check_running()
if chunksize < 1:
raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize))
if buffersize is not None:
if not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize < 1:
raise ValueError("buffersize must be None or > 0")

result = IMapIterator(self, buffersize)
if chunksize == 1:
result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
self._guarded_task_generation(result._job, func, iterable,
result._buffersize_sema),
result._set_length,
)
)
return result
else:
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
self._guarded_task_generation(result._job, mapstar, task_batches,
result._buffersize_sema),
result._set_length,
)
)
return (item for chunk in result for item in chunk)

def imap_unordered(self, func, iterable, chunksize=1):
def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
'''
Like `imap()` method but ordering of results is arbitrary.
'''
self._check_running()
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0!r}".format(chunksize)
)
if buffersize is not None:
if not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize < 1:
raise ValueError("buffersize must be None or > 0")

result = IMapUnorderedIterator(self, buffersize)
if chunksize == 1:
result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
self._guarded_task_generation(result._job, func, iterable,
result._buffersize_sema),
result._set_length,
)
)
return result
else:
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
self._guarded_task_generation(result._job, mapstar, task_batches,
result._buffersize_sema),
result._set_length,
)
)
return (item for chunk in result for item in chunk)

def apply_async(self, func, args=(), kwds={}, callback=None,
Expand Down Expand Up @@ -679,7 +713,8 @@ def _help_stuff_finish(inqueue, task_handler, size):

@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
worker_handler, task_handler, result_handler, cache,
taskqueue_buffersize_semaphores):
# this is guaranteed to only be called once
util.debug('finalizing pool')

Expand All @@ -690,6 +725,11 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
change_notifier.put(None)

task_handler._state = TERMINATE
# Release all semaphores to wake up task_handler to stop.
for job_id in tuple(taskqueue_buffersize_semaphores.keys()):
sema = taskqueue_buffersize_semaphores.pop(job_id, None)
if sema is not None:
sema.release()

util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
Expand Down Expand Up @@ -835,8 +875,7 @@ def _set(self, i, success_result):
#

class IMapIterator(object):

def __init__(self, pool):
def __init__(self, pool, buffersize):
self._pool = pool
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
Expand All @@ -846,6 +885,13 @@ def __init__(self, pool):
self._length = None
self._unsorted = {}
self._cache[self._job] = self
if buffersize is None:
self._buffersize_sema = None
else:
self._buffersize_sema = threading.Semaphore(buffersize)
self._pool._taskqueue_buffersize_semaphores[self._job] = (
self._buffersize_sema
)

def __iter__(self):
return self
Expand All @@ -856,22 +902,30 @@ def next(self, timeout=None):
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
self._pool = None
raise StopIteration from None
self._stop_iterator()
raise TimeoutError from None

if self._buffersize_sema is not None:
self._buffersize_sema.release()

success, value = item
if success:
return value
raise value

def _stop_iterator(self):
if self._pool is not None:
# `self._pool` could be set to `None` in previous `.next()` calls
self._pool._taskqueue_buffersize_semaphores.pop(self._job, None)
self._pool = None
raise StopIteration from None

__next__ = next # XXX

def _set(self, i, obj):
Expand Down
131 changes: 117 additions & 14 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2916,18 +2916,100 @@ def test_async_timeout(self):
p.join()

def test_imap(self):
it = self.pool.imap(sqr, list(range(10)))
self.assertEqual(list(it), list(map(sqr, list(range(10)))))
optimal_buffersize = 4 # `self.pool` size
buffersize_variants = [
{"buffersize": None},
{"buffersize": 1},
{"buffersize": optimal_buffersize},
{"buffersize": optimal_buffersize * 2},
]

it = self.pool.imap(sqr, list(range(10)))
for i in range(10):
self.assertEqual(next(it), i*i)
self.assertRaises(StopIteration, it.__next__)
for kwargs in ({}, *buffersize_variants):
with self.subTest(**kwargs):
iterable = range(10)
if self.TYPE != "threads":
iterable = list(iterable)
it = self.pool.imap(sqr, iterable, **kwargs)
self.assertEqual(list(it), list(map(sqr, list(range(10)))))

iterable = range(10)
if self.TYPE != "threads":
iterable = list(iterable)
it = self.pool.imap(sqr, iterable, **kwargs)
for i in range(10):
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, it.__next__)

for kwargs in (
{"chunksize": 100},
{"chunksize": 100, "buffersize": optimal_buffersize},
):
with self.subTest(**kwargs):
iterable = range(1000)
if self.TYPE != "threads":
iterable = list(iterable)
it = self.pool.imap(sqr, iterable, **kwargs)
for i in range(1000):
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, it.__next__)

def test_imap_fast_iterable_with_slow_task(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

processes = 4
p = self.Pool(processes)

tasks_started_later = 2
last_produced_task_arg = Value("i")

def produce_args():
for arg in range(1, processes + tasks_started_later + 1):
last_produced_task_arg.value = arg
yield arg

it = p.imap(functools.partial(sqr, wait=0.2), produce_args())

next(it)
time.sleep(0.2)
# `iterable` should've been advanced only up by `processes` times,
# but in fact advances further (by `>=processes+1`).
# In this case, it advances to the maximum value.
self.assertGreater(last_produced_task_arg.value, processes + 1)

it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
for i in range(1000):
self.assertEqual(next(it), i*i)
self.assertRaises(StopIteration, it.__next__)
p.terminate()
p.join()

def test_imap_fast_iterable_with_slow_task_and_buffersize(self):
if self.TYPE != "threads":
self.skipTest("test not appropriate for {}".format(self.TYPE))

processes = 4
p = self.Pool(processes)

tasks_started_later = 2
last_produced_task_arg = Value("i")

def produce_args():
for arg in range(1, processes + tasks_started_later + 1):
last_produced_task_arg.value = arg
yield arg

it = p.imap(
functools.partial(sqr, wait=0.2),
produce_args(),
buffersize=processes,
)

time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes)

next(it)
time.sleep(0.2)
self.assertEqual(last_produced_task_arg.value, processes + 1)

p.terminate()
p.join()

def test_imap_handle_iterable_exception(self):
if self.TYPE == 'manager':
Expand Down Expand Up @@ -2956,11 +3038,32 @@ def test_imap_handle_iterable_exception(self):
self.assertRaises(SayWhenError, it.__next__)

def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, list(range(10)))
self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
optimal_buffersize = 4 # `self.pool` size
buffersize_variants = [
{"buffersize": None},
{"buffersize": 1},
{"buffersize": optimal_buffersize},
{"buffersize": optimal_buffersize * 2},
]

it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
for kwargs in ({}, *buffersize_variants):
with self.subTest(**kwargs):
iterable = range(10)
if self.TYPE != "threads":
iterable = list(iterable)
it = self.pool.imap_unordered(sqr, iterable, **kwargs)
self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))

for kwargs in (
{"chunksize": 100},
{"chunksize": 100, "buffersize": optimal_buffersize},
):
with self.subTest(**kwargs):
iterable = range(1000)
if self.TYPE != "threads":
iterable = list(iterable)
it = self.pool.imap_unordered(sqr, iterable, **kwargs)
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

def test_imap_unordered_handle_iterable_exception(self):
if self.TYPE == 'manager':
Expand Down
Loading