Skip to content

Commit 9564121

Browse files
runningcodeakarnokd
authored andcommitted
Check isDisposed before emitting in SingleFromCallable (#5743)
Previously SingleFromCallable did not check if the subscriber was unsubscribed before emitting onSuccess or onError. This fixes that behavior and adds tests to SingleFromCallable, CompletableFromCallable, and MaybeFromCallable. Fixes #5742
1 parent e25be7c commit 9564121

File tree

4 files changed

+373
-11
lines changed

4 files changed

+373
-11
lines changed

src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
import java.util.concurrent.Callable;
1717

1818
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.disposables.Disposables;
1921
import io.reactivex.exceptions.Exceptions;
2022
import io.reactivex.internal.disposables.EmptyDisposable;
23+
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
2125

2226
public final class SingleFromCallable<T> extends Single<T> {
2327

@@ -28,20 +32,29 @@ public SingleFromCallable(Callable<? extends T> callable) {
2832
}
2933

3034
@Override
31-
protected void subscribeActual(SingleObserver<? super T> s) {
35+
protected void subscribeActual(SingleObserver<? super T> observer) {
36+
Disposable d = Disposables.empty();
37+
observer.onSubscribe(d);
38+
39+
if (d.isDisposed()) {
40+
return;
41+
}
42+
T value;
3243

33-
s.onSubscribe(EmptyDisposable.INSTANCE);
3444
try {
35-
T v = callable.call();
36-
if (v != null) {
37-
s.onSuccess(v);
45+
value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
46+
} catch (Throwable ex) {
47+
Exceptions.throwIfFatal(ex);
48+
if (!d.isDisposed()) {
49+
observer.onError(ex);
3850
} else {
39-
s.onError(new NullPointerException("The callable returned a null value"));
51+
RxJavaPlugins.onError(ex);
4052
}
41-
} catch (Throwable e) {
42-
Exceptions.throwIfFatal(e);
43-
s.onError(e);
53+
return;
4454
}
45-
}
4655

56+
if (!d.isDisposed()) {
57+
observer.onSuccess(value);
58+
}
59+
}
4760
}

src/test/java/io/reactivex/internal/operators/completable/CompletableFromCallableTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,22 @@
1515

1616
import io.reactivex.Completable;
1717
import java.util.concurrent.Callable;
18+
import java.util.concurrent.CountDownLatch;
1819
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import io.reactivex.Observable;
22+
import io.reactivex.Observer;
23+
import io.reactivex.TestHelper;
24+
import io.reactivex.disposables.Disposable;
25+
import io.reactivex.observers.TestObserver;
26+
import io.reactivex.schedulers.Schedulers;
1927
import org.junit.Test;
28+
import org.mockito.invocation.InvocationOnMock;
29+
import org.mockito.stubbing.Answer;
2030

2131
import static org.junit.Assert.assertEquals;
32+
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.*;
2234

2335
public class CompletableFromCallableTest {
2436
@Test(expected = NullPointerException.class)
@@ -100,4 +112,57 @@ public Object call() throws Exception {
100112
.test()
101113
.assertFailure(UnsupportedOperationException.class);
102114
}
115+
116+
@SuppressWarnings("unchecked")
117+
@Test
118+
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
119+
Callable<String> func = mock(Callable.class);
120+
121+
final CountDownLatch funcLatch = new CountDownLatch(1);
122+
final CountDownLatch observerLatch = new CountDownLatch(1);
123+
124+
when(func.call()).thenAnswer(new Answer<String>() {
125+
@Override
126+
public String answer(InvocationOnMock invocation) throws Throwable {
127+
observerLatch.countDown();
128+
129+
try {
130+
funcLatch.await();
131+
} catch (InterruptedException e) {
132+
// It's okay, unsubscription causes Thread interruption
133+
134+
// Restoring interruption status of the Thread
135+
Thread.currentThread().interrupt();
136+
}
137+
138+
return "should_not_be_delivered";
139+
}
140+
});
141+
142+
Completable fromCallableObservable = Completable.fromCallable(func);
143+
144+
Observer<Object> observer = TestHelper.mockObserver();
145+
146+
TestObserver<String> outer = new TestObserver<String>(observer);
147+
148+
fromCallableObservable
149+
.subscribeOn(Schedulers.computation())
150+
.subscribe(outer);
151+
152+
// Wait until func will be invoked
153+
observerLatch.await();
154+
155+
// Unsubscribing before emission
156+
outer.cancel();
157+
158+
// Emitting result
159+
funcLatch.countDown();
160+
161+
// func must be invoked
162+
verify(func).call();
163+
164+
// Observer must not be notified at all
165+
verify(observer).onSubscribe(any(Disposable.class));
166+
verifyNoMoreInteractions(observer);
167+
}
103168
}

src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
package io.reactivex.internal.operators.maybe;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.Mockito.*;
1719

1820
import java.util.List;
1921
import java.util.concurrent.*;
2022
import java.util.concurrent.atomic.AtomicInteger;
2123

24+
import io.reactivex.disposables.Disposable;
2225
import org.junit.Test;
2326

2427
import io.reactivex.*;
2528
import io.reactivex.observers.TestObserver;
2629
import io.reactivex.plugins.RxJavaPlugins;
2730
import io.reactivex.schedulers.Schedulers;
31+
import org.mockito.invocation.InvocationOnMock;
32+
import org.mockito.stubbing.Answer;
2833

2934
public class MaybeFromCallableTest {
3035
@Test(expected = NullPointerException.class)
@@ -158,4 +163,57 @@ public Integer call() throws Exception {
158163
RxJavaPlugins.reset();
159164
}
160165
}
166+
167+
@SuppressWarnings("unchecked")
168+
@Test
169+
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
170+
Callable<String> func = mock(Callable.class);
171+
172+
final CountDownLatch funcLatch = new CountDownLatch(1);
173+
final CountDownLatch observerLatch = new CountDownLatch(1);
174+
175+
when(func.call()).thenAnswer(new Answer<String>() {
176+
@Override
177+
public String answer(InvocationOnMock invocation) throws Throwable {
178+
observerLatch.countDown();
179+
180+
try {
181+
funcLatch.await();
182+
} catch (InterruptedException e) {
183+
// It's okay, unsubscription causes Thread interruption
184+
185+
// Restoring interruption status of the Thread
186+
Thread.currentThread().interrupt();
187+
}
188+
189+
return "should_not_be_delivered";
190+
}
191+
});
192+
193+
Maybe<String> fromCallableObservable = Maybe.fromCallable(func);
194+
195+
Observer<Object> observer = TestHelper.mockObserver();
196+
197+
TestObserver<String> outer = new TestObserver<String>(observer);
198+
199+
fromCallableObservable
200+
.subscribeOn(Schedulers.computation())
201+
.subscribe(outer);
202+
203+
// Wait until func will be invoked
204+
observerLatch.await();
205+
206+
// Unsubscribing before emission
207+
outer.cancel();
208+
209+
// Emitting result
210+
funcLatch.countDown();
211+
212+
// func must be invoked
213+
verify(func).call();
214+
215+
// Observer must not be notified at all
216+
verify(observer).onSubscribe(any(Disposable.class));
217+
verifyNoMoreInteractions(observer);
218+
}
161219
}

0 commit comments

Comments
 (0)