Skip to content

Commit 023555c

Browse files
Merge pull request #1712 from mattrjacobs/demonstrate-mergeDelayError-regression
Fixing regression in mergeDelayError
2 parents e63a4cb + 2d72e99 commit 023555c

File tree

2 files changed

+39
-16
lines changed

2 files changed

+39
-16
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public void onError(Throwable e) {
409409
boolean sendOnComplete = false;
410410
synchronized (this) {
411411
wip--;
412-
if (wip == 0 && completed) {
412+
if ((wip == 0 && completed) || (wip < 0)) {
413413
sendOnComplete = true;
414414
}
415415
}

rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,27 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertNotNull;
20-
import static org.junit.Assert.fail;
21-
import static org.mockito.Matchers.any;
22-
import static org.mockito.Matchers.anyInt;
23-
import static org.mockito.Mockito.inOrder;
24-
import static org.mockito.Mockito.mock;
25-
import static org.mockito.Mockito.never;
26-
import static org.mockito.Mockito.times;
27-
import static org.mockito.Mockito.verify;
28-
29-
import java.util.ArrayList;
30-
import java.util.List;
31-
3218
import org.junit.Before;
3319
import org.junit.Test;
3420
import org.mockito.InOrder;
3521
import org.mockito.Mock;
3622
import org.mockito.MockitoAnnotations;
37-
3823
import rx.Observable;
3924
import rx.Observable.OnSubscribe;
4025
import rx.Observer;
4126
import rx.Subscriber;
4227
import rx.exceptions.CompositeException;
4328
import rx.exceptions.TestException;
4429

30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.concurrent.CountDownLatch;
33+
34+
import static org.junit.Assert.*;
35+
import static org.mockito.Matchers.any;
36+
import static org.mockito.Matchers.anyInt;
37+
import static org.mockito.Mockito.*;
38+
4539
public class OperatorMergeDelayErrorTest {
4640

4741
@Mock
@@ -289,6 +283,35 @@ public void testMergeArrayWithThreading() {
289283
verify(stringObserver, times(1)).onCompleted();
290284
}
291285

286+
@Test(timeout=1000L)
287+
public void testSynchronousError() {
288+
final Observable<Observable<String>> o1 = Observable.error(new RuntimeException("unit test"));
289+
290+
final CountDownLatch latch = new CountDownLatch(1);
291+
Observable.mergeDelayError(o1).subscribe(new Subscriber<String>() {
292+
@Override
293+
public void onCompleted() {
294+
fail("Expected onError path");
295+
}
296+
297+
@Override
298+
public void onError(Throwable e) {
299+
latch.countDown();
300+
}
301+
302+
@Override
303+
public void onNext(String s) {
304+
fail("Expected onError path");
305+
}
306+
});
307+
308+
try {
309+
latch.await();
310+
} catch (InterruptedException ex) {
311+
fail("interrupted");
312+
}
313+
}
314+
292315
private static class TestSynchronousObservable implements Observable.OnSubscribe<String> {
293316

294317
@Override

0 commit comments

Comments
 (0)