Skip to content

Commit a128c5e

Browse files
committed
TakeWhile: don't unsubscribe downstream.
1 parent 0abfb74 commit a128c5e

File tree

3 files changed

+68
-16
lines changed

3 files changed

+68
-16
lines changed

src/main/java/rx/Subscriber.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,24 @@ public abstract class Subscriber<T> implements Observer<T>, Subscription {
4040
private long requested = Long.MIN_VALUE; // default to not set
4141

4242
protected Subscriber() {
43-
this.op = null;
44-
this.cs = new SubscriptionList();
43+
this(null, false);
4544
}
4645

4746
protected Subscriber(Subscriber<?> op) {
47+
this(op, true);
48+
}
49+
/**
50+
* Construct a subscriber by using the other subscriber for backpressure
51+
* and optionally sharing the underlying subscriptions list.
52+
* <p>To retain the chaining of subscribers, the caller should add the
53+
* created instance to the op via {@code add()}.
54+
*
55+
* @param op the other subscriber
56+
* @param shareSubscriptions should the subscription list in op shared with this instance?
57+
*/
58+
protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
4859
this.op = op;
49-
this.cs = op.cs;
60+
this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList();
5061
}
5162

5263
/**

src/main/java/rx/internal/operators/OperatorTakeWhile.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public OperatorTakeWhile(Func2<? super T, ? super Integer, Boolean> predicate) {
4545

4646
@Override
4747
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
48-
return new Subscriber<T>(subscriber) {
48+
Subscriber<T> s = new Subscriber<T>(subscriber, false) {
4949

5050
private int counter = 0;
5151

@@ -86,6 +86,8 @@ public void onError(Throwable e) {
8686
}
8787

8888
};
89+
subscriber.add(s);
90+
return s;
8991
}
9092

9193
}

src/test/java/rx/internal/operators/OperatorTakeWhileTest.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,17 @@
1717

1818
import static org.junit.Assert.fail;
1919
import static org.mockito.Matchers.any;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.*;
2421

25-
import org.junit.Test;
22+
import java.util.Arrays;
2623

27-
import rx.Observable;
24+
import org.junit.*;
25+
26+
import rx.*;
2827
import rx.Observable.OnSubscribe;
29-
import rx.Observer;
30-
import rx.Subscriber;
31-
import rx.Subscription;
3228
import rx.functions.Func1;
33-
import rx.functions.Func2;
34-
import rx.subjects.PublishSubject;
35-
import rx.subjects.Subject;
29+
import rx.observers.TestSubscriber;
30+
import rx.subjects.*;
3631

3732
public class OperatorTakeWhileTest {
3833

@@ -222,4 +217,48 @@ public void run() {
222217
System.out.println("done starting TestObservable thread");
223218
}
224219
}
220+
221+
@Test
222+
public void testBackpressure() {
223+
Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
224+
@Override
225+
public Boolean call(Integer t1) {
226+
return t1 < 100;
227+
}
228+
});
229+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
230+
@Override
231+
public void onStart() {
232+
request(5);
233+
}
234+
};
235+
236+
source.subscribe(ts);
237+
238+
ts.assertNoErrors();
239+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
240+
241+
ts.requestMore(5);
242+
243+
ts.assertNoErrors();
244+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
245+
}
246+
247+
@Test
248+
public void testNoUnsubscribeDownstream() {
249+
Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
250+
@Override
251+
public Boolean call(Integer t1) {
252+
return t1 < 2;
253+
}
254+
});
255+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
256+
257+
source.unsafeSubscribe(ts);
258+
259+
ts.assertNoErrors();
260+
ts.assertReceivedOnNext(Arrays.asList(1));
261+
262+
Assert.assertFalse("Unsubscribed!", ts.isUnsubscribed());
263+
}
225264
}

0 commit comments

Comments
 (0)