Skip to content

Commit 69b6102

Browse files
Merge pull request #1409 from dpsm/master
Avoiding OperatorObserveOn from calling subscriber.onNext(..) after unsu...
2 parents 7773ddf + cd4f112 commit 69b6102

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7272
= AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
7373

7474
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber) {
75-
super(subscriber);
7675
this.observer = subscriber;
7776
this.recursiveScheduler = scheduler.createWorker();
7877
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
79-
subscriber.add(scheduledUnsubscribe);
78+
add(scheduledUnsubscribe);
79+
80+
subscriber.add(recursiveScheduler);
81+
subscriber.add(this);
8082
}
8183

8284
@Override

rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import rx.Observable;
4141
import rx.Observer;
4242
import rx.Scheduler;
43+
import rx.Subscription;
4344
import rx.exceptions.TestException;
4445
import rx.functions.Action0;
4546
import rx.functions.Action1;
@@ -389,4 +390,21 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
389390
inOrder.verify(o, never()).onNext(anyInt());
390391
inOrder.verify(o, never()).onCompleted();
391392
}
393+
394+
@Test
395+
public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() {
396+
final TestScheduler testScheduler = new TestScheduler();
397+
final Observer<Integer> observer = mock(Observer.class);
398+
final Subscription subscription = Observable.from(1, 2, 3)
399+
.observeOn(testScheduler)
400+
.subscribe(observer);
401+
subscription.unsubscribe();
402+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
403+
404+
final InOrder inOrder = inOrder(observer);
405+
406+
inOrder.verify(observer, never()).onNext(anyInt());
407+
inOrder.verify(observer, never()).onError(any(Exception.class));
408+
inOrder.verify(observer, never()).onCompleted();
409+
}
392410
}

0 commit comments

Comments
 (0)