Skip to content

Commit 460459f

Browse files
Merge pull request #1357 from benjchristensen/mergeWith-and-friends
MergeWith, ConcatWith, AmbWith
2 parents 1aa6fc2 + bef3659 commit 460459f

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3070,6 +3070,24 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Ob
30703070
public final Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
30713071
return lift(new OperatorAll<T>(predicate));
30723072
}
3073+
3074+
/**
3075+
* Mirrors the first Observable (current or provided) that emits an item.
3076+
* <p>
3077+
* <img width="640" height="385" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/amb.png">
3078+
* <p>
3079+
* {@code amb} does not operate by default on a particular {@link Scheduler}.
3080+
*
3081+
* @param o1
3082+
* an Observable competing to react first
3083+
* @return an Observable that emits the same sequence of items as whichever of the source Observables first
3084+
* emitted an item
3085+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-amb">RxJava Wiki: amb()</a>
3086+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
3087+
*/
3088+
public final Observable<T> ambWith(Observable<? extends T> t1) {
3089+
return amb(this, t1);
3090+
}
30733091

30743092
/**
30753093
* Disguises a object of an Observable subclass as a simple Observable object. Useful for instance when you
@@ -3475,6 +3493,25 @@ public final R call(R state, T value) {
34753493
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
34763494
return concat(map(func));
34773495
}
3496+
3497+
/**
3498+
* Returns an Observable that emits the items emitted from the current Observable, then the next, one after the other, without
3499+
* interleaving them.
3500+
* <p>
3501+
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/concat.png">
3502+
* <p>
3503+
* {@code concat} does not operate by default on a particular {@link Scheduler}.
3504+
*
3505+
* @param t1
3506+
* an Observable to be concatenated after the current
3507+
* @return an Observable that emits items emitted by the two source Observables, one after the other,
3508+
* without interleaving them
3509+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-concat">RxJava Wiki: concat()</a>
3510+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.concat.aspx">MSDN: Observable.Concat</a>
3511+
*/
3512+
public final Observable<T> concatWith(Observable<? extends T> t1) {
3513+
return concat(this, t1);
3514+
}
34783515

34793516
/**
34803517
* Returns an Observable that emits a Boolean that indicates whether the source Observable emitted a
@@ -4767,6 +4804,26 @@ public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends It
47674804
return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector);
47684805
}
47694806

4807+
/**
4808+
* Flattens this and another Observable into a single Observable, without any transformation.
4809+
* <p>
4810+
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
4811+
* <p>
4812+
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
4813+
* using the {@code merge} method.
4814+
* <p>
4815+
* {@code merge} does not operate by default on a particular {@link Scheduler}.
4816+
*
4817+
* @param t1
4818+
* an Observable to be merged
4819+
* @return an Observable that emits all of the items emitted by the source Observables
4820+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-merge">RxJava Wiki: merge()</a>
4821+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
4822+
*/
4823+
public final Observable<T> mergeWith(Observable<? extends T> t1) {
4824+
return merge(this, t1);
4825+
}
4826+
47704827
/**
47714828
* Returns an Observable that emits items produced by multicasting the source Observable within a selector
47724829
* function.

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@
4444
import org.mockito.InOrder;
4545
import org.mockito.Mock;
4646
import org.mockito.MockitoAnnotations;
47-
import rx.Observable.OnSubscribe;
4847

48+
import rx.Observable.OnSubscribe;
4949
import rx.functions.Action1;
5050
import rx.functions.Action2;
5151
import rx.functions.Func1;
5252
import rx.functions.Func2;
5353
import rx.observables.ConnectableObservable;
54+
import rx.observers.TestSubscriber;
5455
import rx.schedulers.TestScheduler;
5556
import rx.subscriptions.BooleanSubscription;
5657

@@ -993,5 +994,26 @@ public void call(StringBuilder sb, Integer v) {
993994

994995
assertEquals("1-2-3", value);
995996
}
997+
998+
@Test
999+
public void testMergeWith() {
1000+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1001+
Observable.just(1).mergeWith(Observable.just(2)).subscribe(ts);
1002+
ts.assertReceivedOnNext(Arrays.asList(1, 2));
1003+
}
1004+
1005+
@Test
1006+
public void testConcatWith() {
1007+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1008+
Observable.just(1).concatWith(Observable.just(2)).subscribe(ts);
1009+
ts.assertReceivedOnNext(Arrays.asList(1, 2));
1010+
}
1011+
1012+
@Test
1013+
public void testAmbWith() {
1014+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1015+
Observable.just(1).ambWith(Observable.just(2)).subscribe(ts);
1016+
ts.assertReceivedOnNext(Arrays.asList(1));
1017+
}
9961018

9971019
}

0 commit comments

Comments
 (0)