Skip to content

Commit 4f29be4

Browse files
Merge pull request #2809 from akarnokd/TakeUntilTerminationFix
Fixed takeUntil not unsubscribing from either of the observables in case
2 parents ecbd27d + d6eb4dd commit 4f29be4

File tree

2 files changed

+144
-12
lines changed

2 files changed

+144
-12
lines changed

src/main/java/rx/internal/operators/OperatorTakeUntil.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,62 @@ public OperatorTakeUntil(final Observable<? extends E> other) {
3636

3737
@Override
3838
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39-
final Subscriber<T> parent = new SerializedSubscriber<T>(child);
40-
41-
other.unsafeSubscribe(new Subscriber<E>(child) {
42-
39+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
40+
41+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
42+
@Override
43+
public void onNext(T t) {
44+
serial.onNext(t);
45+
}
46+
@Override
47+
public void onError(Throwable e) {
48+
try {
49+
serial.onError(e);
50+
} finally {
51+
serial.unsubscribe();
52+
}
53+
}
4354
@Override
4455
public void onCompleted() {
45-
parent.onCompleted();
56+
try {
57+
serial.onCompleted();
58+
} finally {
59+
serial.unsubscribe();
60+
}
61+
}
62+
};
63+
64+
final Subscriber<E> so = new Subscriber<E>() {
65+
@Override
66+
public void onStart() {
67+
request(Long.MAX_VALUE);
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
main.onCompleted();
4673
}
4774

4875
@Override
4976
public void onError(Throwable e) {
50-
parent.onError(e);
77+
main.onError(e);
5178
}
5279

5380
@Override
5481
public void onNext(E t) {
55-
parent.onCompleted();
82+
onCompleted();
5683
}
5784

58-
});
85+
};
86+
87+
serial.add(main);
88+
serial.add(so);
89+
90+
child.add(serial);
91+
92+
other.unsafeSubscribe(so);
5993

60-
return parent;
94+
return main;
6195
}
6296

6397
}

src/test/java/rx/internal/operators/OperatorTakeUntilTest.java

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.mockito.Mockito.mock;
19-
import static org.mockito.Mockito.times;
20-
import static org.mockito.Mockito.verify;
18+
import static org.junit.Assert.assertFalse;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.Arrays;
2123

2224
import org.junit.Test;
2325

2426
import rx.Observable;
2527
import rx.Observer;
2628
import rx.Subscriber;
2729
import rx.Subscription;
30+
import rx.observers.TestSubscriber;
31+
import rx.subjects.PublishSubject;
2832

2933
public class OperatorTakeUntilTest {
3034

@@ -188,4 +192,98 @@ public void call(Subscriber<? super String> observer) {
188192
observer.add(s);
189193
}
190194
}
195+
196+
@Test
197+
public void testUntilFires() {
198+
PublishSubject<Integer> source = PublishSubject.create();
199+
PublishSubject<Integer> until = PublishSubject.create();
200+
201+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
202+
203+
source.takeUntil(until).unsafeSubscribe(ts);
204+
205+
assertTrue(source.hasObservers());
206+
assertTrue(until.hasObservers());
207+
208+
source.onNext(1);
209+
210+
ts.assertReceivedOnNext(Arrays.asList(1));
211+
until.onNext(1);
212+
213+
ts.assertReceivedOnNext(Arrays.asList(1));
214+
ts.assertNoErrors();
215+
ts.assertTerminalEvent();
216+
217+
assertFalse("Source still has observers", source.hasObservers());
218+
assertFalse("Until still has observers", until.hasObservers());
219+
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
220+
}
221+
@Test
222+
public void testMainCompletes() {
223+
PublishSubject<Integer> source = PublishSubject.create();
224+
PublishSubject<Integer> until = PublishSubject.create();
225+
226+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
227+
228+
source.takeUntil(until).unsafeSubscribe(ts);
229+
230+
assertTrue(source.hasObservers());
231+
assertTrue(until.hasObservers());
232+
233+
source.onNext(1);
234+
source.onCompleted();
235+
236+
ts.assertReceivedOnNext(Arrays.asList(1));
237+
ts.assertNoErrors();
238+
ts.assertTerminalEvent();
239+
240+
assertFalse("Source still has observers", source.hasObservers());
241+
assertFalse("Until still has observers", until.hasObservers());
242+
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
243+
}
244+
@Test
245+
public void testDownstreamUnsubscribes() {
246+
PublishSubject<Integer> source = PublishSubject.create();
247+
PublishSubject<Integer> until = PublishSubject.create();
248+
249+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
250+
251+
source.takeUntil(until).take(1).unsafeSubscribe(ts);
252+
253+
assertTrue(source.hasObservers());
254+
assertTrue(until.hasObservers());
255+
256+
source.onNext(1);
257+
258+
ts.assertReceivedOnNext(Arrays.asList(1));
259+
ts.assertNoErrors();
260+
ts.assertTerminalEvent();
261+
262+
assertFalse("Source still has observers", source.hasObservers());
263+
assertFalse("Until still has observers", until.hasObservers());
264+
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
265+
}
266+
public void testBackpressure() {
267+
PublishSubject<Integer> until = PublishSubject.create();
268+
269+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
270+
@Override
271+
public void onStart() {
272+
requestMore(0);
273+
}
274+
};
275+
276+
Observable.range(1, 10).takeUntil(until).unsafeSubscribe(ts);
277+
278+
assertTrue(until.hasObservers());
279+
280+
ts.requestMore(1);
281+
282+
ts.assertReceivedOnNext(Arrays.asList(1));
283+
ts.assertNoErrors();
284+
assertTrue("TestSubscriber completed", ts.getOnCompletedEvents().isEmpty());
285+
286+
assertFalse("Until still has observers", until.hasObservers());
287+
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
288+
}
191289
}

0 commit comments

Comments
 (0)