Skip to content

Commit 2ba8bb2

Browse files
pkolaczkakarnokd
authored andcommitted
Fix a race condition in OperatorMerge.InnerSubscriber#onError (#5851)
* Fix a race condition in OperatorMerge.InnerSubscriber#onError Inner subscriber must queue the error first before setting done, so that after emitLoop() removes the subscriber, emitLoop is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, and at the same time the error queue was empty. * Add unit test for OperatorMerge.InnerSubscriber#onError race
1 parent a49c49f commit 2ba8bb2

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,8 +847,11 @@ public void onNext(T t) {
847847
}
848848
@Override
849849
public void onError(Throwable e) {
850-
done = true;
850+
// Need to queue the error first before setting done, so that after emitLoop() removes the subscriber,
851+
// it is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0,
852+
// and at the same time the error queue was empty.
851853
parent.getOrCreateErrorQueue().offer(e);
854+
done = true;
852855
parent.emit();
853856
}
854857
@Override

src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,34 @@ public void onNext(Integer t) {
12051205
assertTrue(latch.await(10, TimeUnit.SECONDS));
12061206
}
12071207

1208+
@Test
1209+
public void testConcurrentMergeInnerError() {
1210+
for (int i = 0; i < 1000; i++) {
1211+
final TestSubscriber<Integer> ts = TestSubscriber.create();
1212+
final PublishSubject<Integer> ps1 = PublishSubject.create();
1213+
final PublishSubject<Integer> ps2 = PublishSubject.create();
1214+
final Exception error = new NullPointerException();
1215+
Action0 action1 = new Action0() {
1216+
@Override
1217+
public void call() {
1218+
ps1.onNext(1);
1219+
ps1.onCompleted();
1220+
}
1221+
};
1222+
Action0 action2 = new Action0() {
1223+
@Override
1224+
public void call() {
1225+
ps2.onError(error);
1226+
}
1227+
};
1228+
1229+
Observable.mergeDelayError(ps1, ps2).subscribe(ts);
1230+
TestUtil.race(action1, action2);
1231+
ts.assertTerminalEvent();
1232+
ts.assertError(error);
1233+
}
1234+
}
1235+
12081236
private static Action1<Integer> printCount() {
12091237
return new Action1<Integer>() {
12101238
long count;

0 commit comments

Comments
 (0)