Skip to content

Commit c40a06f

Browse files
pkolaczkakarnokd
authored andcommitted
Fix a race condition that may make OperatorMaterialize emit too many terminal notifications (#5850)
* Fix a race condition that may make OperatorMaterialize emit the terminal notification more than once The guards in `OperatorMaterialize.ParentSubscriber#drain` were never working, because `busy` was actually never set to true. Therefore it was possible that the `drain` loop was executed by more than one thread concurrently, which could led to undefined behavior. This fix sets `busy` to true at the entry of `drain`. * Add unit test for race in OperatorMaterialize * Set sudo required in travis config
1 parent 2ba8bb2 commit c40a06f

File tree

3 files changed

+32
-1
lines changed

3 files changed

+32
-1
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
language: java
22
jdk:
33
- oraclejdk8
4-
sudo: false
4+
sudo: required
55
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/
66

77
git:

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ private void drain() {
134134
missed = true;
135135
return;
136136
}
137+
busy = true;
137138
}
138139
// drain loop
139140
final AtomicLong localRequested = this.requested;

src/test/java/rx/internal/operators/OperatorMaterializeTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
import rx.Notification;
3030
import rx.Observable;
3131
import rx.Subscriber;
32+
import rx.TestUtil;
33+
import rx.functions.Action0;
3234
import rx.functions.Action1;
3335
import rx.observers.TestSubscriber;
3436
import rx.schedulers.Schedulers;
37+
import rx.subjects.PublishSubject;
3538

3639
public class OperatorMaterializeTest {
3740

@@ -201,6 +204,33 @@ public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNoti
201204
ts.assertUnsubscribed();
202205
}
203206

207+
@Test
208+
public void testConcurrency() {
209+
for (int i = 0; i < 1000; i++) {
210+
final TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
211+
final PublishSubject<Integer> ps = PublishSubject.create();
212+
Action0 publishAction = new Action0() {
213+
@Override
214+
public void call() {
215+
ps.onCompleted();
216+
}
217+
};
218+
219+
Action0 requestAction = new Action0() {
220+
@Override
221+
public void call() {
222+
ts.requestMore(1);
223+
}
224+
};
225+
226+
ps.materialize().subscribe(ts);
227+
TestUtil.race(publishAction, requestAction);
228+
ts.assertValueCount(1);
229+
ts.assertTerminalEvent();
230+
ts.assertNoErrors();
231+
}
232+
}
233+
204234
private static class TestObserver extends Subscriber<Notification<String>> {
205235

206236
boolean onCompleted;

0 commit comments

Comments
 (0)