Skip to content

Commit a6a2440

Browse files
Merge pull request #501 from benjchristensen/parallelMerge
ParallelMerge Operator
2 parents f245fcd + 22885fa commit a6a2440

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationOnErrorReturn;
6565
import rx.operators.OperationOnExceptionResumeNextViaObservable;
6666
import rx.operators.OperationParallel;
67+
import rx.operators.OperationParallelMerge;
6768
import rx.operators.OperationRetry;
6869
import rx.operators.OperationSample;
6970
import rx.operators.OperationScan;
@@ -4052,6 +4053,24 @@ public <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, f
40524053
return OperationParallel.parallel(this, f, s);
40534054
}
40544055

4056+
4057+
/**
4058+
* Merges an <code>Observable<Observable<T>></code> to <code>Observable<Observable<T>></code>
4059+
* with number of inner Observables as defined by <code>parallelObservables</code>.
4060+
* <p>
4061+
* For example, if the original <code>Observable<Observable<T>></code> has 100 Observables to be emitted and <code>parallelObservables</code>
4062+
* is defined as 8, the 100 will be grouped onto 8 output Observables.
4063+
* <p>
4064+
* This is a mechanism for efficiently processing N number of Observables on a smaller N number of resources (typically CPU cores).
4065+
*
4066+
* @param parallelObservables
4067+
* the number of Observables to merge into.
4068+
* @return an Observable of Observables constrained to number defined by <code>parallelObservables</code>.
4069+
*/
4070+
public static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<T>> source, int parallelObservables) {
4071+
return OperationParallelMerge.parallelMerge(source, parallelObservables);
4072+
}
4073+
40554074
/**
40564075
* Returns a {@link ConnectableObservable}, which waits until its
40574076
* {@link ConnectableObservable#connect connect} method is called before it
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicLong;
19+
20+
import rx.Observable;
21+
import rx.observables.GroupedObservable;
22+
import rx.util.functions.Func1;
23+
24+
public class OperationParallelMerge {
25+
26+
public static <T> Observable<Observable<T>> parallelMerge(final Observable<Observable<T>> source, final int num) {
27+
28+
return source.groupBy(new Func1<Observable<T>, Integer>() {
29+
final AtomicLong rollingCount = new AtomicLong();
30+
31+
@Override
32+
public Integer call(Observable<T> o) {
33+
return (int) rollingCount.incrementAndGet() % num;
34+
}
35+
}).map(new Func1<GroupedObservable<Integer, Observable<T>>, Observable<T>>() {
36+
37+
/**
38+
* Safe to cast from GroupedObservable to Observable so suppressing warning
39+
*/
40+
@SuppressWarnings("unchecked")
41+
@Override
42+
public Observable<T> call(GroupedObservable<Integer, Observable<T>> o) {
43+
return (Observable<T>) o;
44+
}
45+
46+
});
47+
48+
}
49+
50+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.List;
21+
22+
import org.junit.Test;
23+
24+
import rx.Observable;
25+
import rx.subjects.PublishSubject;
26+
27+
public class OperationParallelMergeTest {
28+
29+
@Test
30+
public void testParallelMerge() {
31+
PublishSubject<String> p1 = PublishSubject.<String> create();
32+
PublishSubject<String> p2 = PublishSubject.<String> create();
33+
PublishSubject<String> p3 = PublishSubject.<String> create();
34+
PublishSubject<String> p4 = PublishSubject.<String> create();
35+
36+
Observable<Observable<String>> fourStreams = Observable.<Observable<String>> from(p1, p2, p3, p4);
37+
38+
Observable<Observable<String>> twoStreams = OperationParallelMerge.parallelMerge(fourStreams, 2);
39+
Observable<Observable<String>> threeStreams = OperationParallelMerge.parallelMerge(fourStreams, 3);
40+
41+
List<? super Observable<String>> fourList = fourStreams.toList().toBlockingObservable().last();
42+
List<? super Observable<String>> threeList = threeStreams.toList().toBlockingObservable().last();
43+
List<? super Observable<String>> twoList = twoStreams.toList().toBlockingObservable().last();
44+
45+
assertEquals(4, fourList.size());
46+
assertEquals(3, threeList.size());
47+
assertEquals(2, twoList.size());
48+
}
49+
}

0 commit comments

Comments
 (0)