7
7
import time
8
8
import weakref
9
9
from concurrent .futures import CancelledError , TimeoutError
10
+ from itertools import islice
10
11
from unittest import mock
11
12
12
13
import pytest
@@ -153,6 +154,9 @@ def test_map(executor):
153
154
results = list (executor .map (lambda x : x + 1 , range (10 )))
154
155
assert results == [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]
155
156
157
+ results = list (executor .map (lambda x , y : x + y , range (10 ), range (9 )))
158
+ assert results == [0 , 2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 ]
159
+
156
160
157
161
def test_map_timeout (executor ):
158
162
"""Test that map with timeout raises TimeoutError and cancels futures"""
@@ -178,6 +182,56 @@ def func(x):
178
182
assert set (results ) != {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 }
179
183
180
184
185
+ def test_map_error (executor ):
186
+ """Test that map with an exception will raise, and remaining tasks are cancelled"""
187
+ results = []
188
+
189
+ def func (x ):
190
+ nonlocal results
191
+ time .sleep (0.05 )
192
+ if len (results ) == 5 :
193
+ raise ValueError ("Test error" )
194
+ results .append (x )
195
+ return x
196
+
197
+ with pytest .raises (ValueError ):
198
+ list (executor .map (func , range (15 )))
199
+
200
+ executor .shutdown (wait = True , cancel_futures = False )
201
+ assert len (results ) <= 10 , "Final 5 at least should have been cancelled"
202
+
203
+
204
+ @pytest .mark .parametrize ("cancel" , [True , False ])
205
+ def test_map_shutdown (executor , cancel ):
206
+ results = []
207
+
208
+ def func (x ):
209
+ nonlocal results
210
+ time .sleep (0.05 )
211
+ results .append (x )
212
+ return x
213
+
214
+ # Get the first few results.
215
+ # Keep the iterator alive so that it isn't closed when its reference is dropped.
216
+ m = executor .map (func , range (15 ))
217
+ values = list (islice (m , 5 ))
218
+ assert values == [0 , 1 , 2 , 3 , 4 ]
219
+
220
+ executor .shutdown (wait = True , cancel_futures = cancel )
221
+ if cancel :
222
+ assert len (results ) < 15 , "Some tasks should have been cancelled"
223
+ else :
224
+ assert len (results ) == 15 , "All tasks should have been completed"
225
+
226
+
227
+ def test_map_start (executor ):
228
+ """Test that map starts tasks immediately, before iterating"""
229
+ e = threading .Event ()
230
+ m = executor .map (lambda x : (e .set (), x ), range (1 ))
231
+ e .wait (timeout = 0.1 )
232
+ assert list (m ) == [(None , 0 )]
233
+
234
+
181
235
def test_closing (executor ):
182
236
"""Test that closing context manager works as expected"""
183
237
# mock the shutdown method of the executor
0 commit comments