Skip to content

Commit c41b37d

Browse files
Merge pull request #2760 from akarnokd/WithLatestFrom
Operator: WithLatestFrom
2 parents 48e4db0 + 844cc95 commit c41b37d

File tree

4 files changed

+437
-1
lines changed

4 files changed

+437
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8804,6 +8804,31 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
88048804
return lift(new OperatorUnsubscribeOn<T>(scheduler));
88058805
}
88068806

8807+
/**
8808+
* Merges the specified observable sequence into this Observable sequence by using the resultSelector
8809+
* function only when the source observable sequence (this instance) produces an element.
8810+
* <code><pre>
8811+
* ----A-------B------C-----> o1
8812+
*
8813+
* --0----1-2----3-4--------> o2
8814+
*
8815+
* | | |
8816+
* V V V
8817+
*
8818+
* (A,0) (B,2) (C,4)
8819+
* </pre></code>
8820+
* @param other the other observable sequence
8821+
* @param resultSelector the function to call when this Observable emits an element and the other
8822+
* observable sequence has already emitted a value.
8823+
* @return an Observable that merges the specified observable sequence into this Observable sequence
8824+
* by using the resultSelector function only when the source observable sequence
8825+
* (this instance) produces an element
8826+
*/
8827+
@Experimental
8828+
public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
8829+
return lift(new OperatorWithLatestFrom<T, U, R>(other, resultSelector));
8830+
}
8831+
88078832
/**
88088833
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
88098834
* Observable emits connected, non-overlapping windows. It emits the current window and opens a new one
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import rx.*;
21+
import rx.Observable.Operator;
22+
import rx.functions.Func2;
23+
import rx.observers.SerializedSubscriber;
24+
25+
/**
26+
* Combines values from two sources only when the main source emits.
27+
* @param <T> the element type of the main observable
28+
* @param <U> the element type of the other observable that is merged into the main
29+
* @param <R> the result element type
30+
*/
31+
public final class OperatorWithLatestFrom<T, U, R> implements Operator<R, T> {
32+
final Func2<? super T, ? super U, ? extends R> resultSelector;
33+
final Observable<? extends U> other;
34+
/** Indicates the other has not yet emitted a value. */
35+
static final Object EMPTY = new Object();
36+
37+
public OperatorWithLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
38+
this.other = other;
39+
this.resultSelector = resultSelector;
40+
}
41+
@Override
42+
public Subscriber<? super T> call(Subscriber<? super R> child) {
43+
// onError and onCompleted may happen either from the main or from other.
44+
final SerializedSubscriber<R> s = new SerializedSubscriber<R>(child, false);
45+
child.add(s);
46+
47+
final AtomicReference<Object> current = new AtomicReference<Object>(EMPTY);
48+
49+
final Subscriber<T> subscriber = new Subscriber<T>(s, true) {
50+
@Override
51+
public void onNext(T t) {
52+
Object o = current.get();
53+
if (o != EMPTY) {
54+
try {
55+
@SuppressWarnings("unchecked")
56+
U u = (U)o;
57+
R result = resultSelector.call(t, u);
58+
59+
s.onNext(result);
60+
} catch (Throwable e) {
61+
onError(e);
62+
return;
63+
}
64+
}
65+
}
66+
@Override
67+
public void onError(Throwable e) {
68+
s.onError(e);
69+
s.unsubscribe();
70+
}
71+
@Override
72+
public void onCompleted() {
73+
s.onCompleted();
74+
s.unsubscribe();
75+
}
76+
};
77+
78+
Subscriber<U> otherSubscriber = new Subscriber<U>() {
79+
@Override
80+
public void onNext(U t) {
81+
current.set(t);
82+
}
83+
@Override
84+
public void onError(Throwable e) {
85+
s.onError(e);
86+
s.unsubscribe();
87+
}
88+
@Override
89+
public void onCompleted() {
90+
if (current.get() == EMPTY) {
91+
s.onCompleted();
92+
s.unsubscribe();
93+
}
94+
}
95+
};
96+
s.add(subscriber);
97+
s.add(otherSubscriber);
98+
99+
other.unsafeSubscribe(otherSubscriber);
100+
101+
return subscriber;
102+
}
103+
}

src/main/java/rx/observers/SerializedSubscriber.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,17 @@ public class SerializedSubscriber<T> extends Subscriber<T> {
3737
private final Observer<T> s;
3838

3939
public SerializedSubscriber(Subscriber<? super T> s) {
40-
super(s);
40+
this(s, true);
41+
}
42+
/**
43+
* Constructor for wrapping and serializing a subscriber optionally sharing the same underlying subscription
44+
* list.
45+
* @param s the subscriber to wrap and serialize
46+
* @param shareSubscriptions if {@code true}, the same subscription list is shared between this
47+
* subscriber and {@code s}.
48+
*/
49+
public SerializedSubscriber(Subscriber<? super T> s, boolean shareSubscriptions) {
50+
super(s, shareSubscriptions);
4151
this.s = new SerializedObserver<T>(s);
4252
}
4353

0 commit comments

Comments
 (0)