Skip to content

Commit 2d72e99

Browse files
author
Matt Jacobs
committed
Fix the failing Observable.mergeDelayError synchronous error unit test
1 parent 2ba610e commit 2d72e99

File tree

2 files changed

+30
-31
lines changed

2 files changed

+30
-31
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public void onError(Throwable e) {
409409
boolean sendOnComplete = false;
410410
synchronized (this) {
411411
wip--;
412-
if (wip == 0 && completed) {
412+
if ((wip == 0 && completed) || (wip < 0)) {
413413
sendOnComplete = true;
414414
}
415415
}

rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,36 +46,6 @@ public void before() {
4646
MockitoAnnotations.initMocks(this);
4747
}
4848

49-
@Test(timeout=1000L)
50-
public void testSynchronousError() {
51-
final Observable<Observable<String>> o1 = Observable.error(new RuntimeException("unit test"));
52-
53-
final CountDownLatch latch = new CountDownLatch(1);
54-
Observable.mergeDelayError(o1).subscribe(new Subscriber<String>() {
55-
@Override
56-
public void onCompleted() {
57-
fail("Expected onError path");
58-
}
59-
60-
@Override
61-
public void onError(Throwable e) {
62-
latch.countDown();
63-
}
64-
65-
@Override
66-
public void onNext(String s) {
67-
fail("Expected onError path");
68-
}
69-
});
70-
71-
try {
72-
latch.await();
73-
} catch (InterruptedException ex) {
74-
fail("interrupted");
75-
}
76-
}
77-
78-
7949
@Test
8050
public void testErrorDelayed1() {
8151
final Observable<String> o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
@@ -313,6 +283,35 @@ public void testMergeArrayWithThreading() {
313283
verify(stringObserver, times(1)).onCompleted();
314284
}
315285

286+
@Test(timeout=1000L)
287+
public void testSynchronousError() {
288+
final Observable<Observable<String>> o1 = Observable.error(new RuntimeException("unit test"));
289+
290+
final CountDownLatch latch = new CountDownLatch(1);
291+
Observable.mergeDelayError(o1).subscribe(new Subscriber<String>() {
292+
@Override
293+
public void onCompleted() {
294+
fail("Expected onError path");
295+
}
296+
297+
@Override
298+
public void onError(Throwable e) {
299+
latch.countDown();
300+
}
301+
302+
@Override
303+
public void onNext(String s) {
304+
fail("Expected onError path");
305+
}
306+
});
307+
308+
try {
309+
latch.await();
310+
} catch (InterruptedException ex) {
311+
fail("interrupted");
312+
}
313+
}
314+
316315
private static class TestSynchronousObservable implements Observable.OnSubscribe<String> {
317316

318317
@Override

0 commit comments

Comments
 (0)