Skip to content

Commit b15df98

Browse files
authored
2.x: add more Maybe operators, fix a few javadoc mistakes (#4467)
1 parent 88fafd8 commit b15df98

24 files changed

+3093
-344
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 37 additions & 67 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Maybe.java

Lines changed: 572 additions & 17 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 43 additions & 73 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,29 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
124124
* @return the new Flowable instance
125125
* @since 2.0
126126
*/
127-
@SuppressWarnings({ "unchecked", "rawtypes" })
128127
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources) {
129-
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), 2, ErrorMode.IMMEDIATE));
128+
return concat(sources, 2);
130129
}
131-
130+
131+
/**
132+
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
133+
* a Publisher sequence and prefetched by the specified amount.
134+
* <dl>
135+
* <dt><b>Scheduler:</b></dt>
136+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
137+
* </dl>
138+
* @param <T> the value type
139+
* @param sources the Publisher of SingleSource instances
140+
* @param prefetch the number of SingleSources to prefetch from the Publisher
141+
* @return the new Flowable instance
142+
* @since 2.0
143+
*/
144+
@SuppressWarnings({ "unchecked", "rawtypes" })
145+
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources, int prefetch) {
146+
ObjectHelper.verifyPositive(prefetch, "prefetch");
147+
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
148+
}
149+
132150
/**
133151
* Returns a Flowable that emits the items emitted by two Singles, one after the other.
134152
* <p>
@@ -218,6 +236,23 @@ public static <T> Flowable<T> concat(
218236
return concat(Flowable.fromArray(source1, source2, source3, source4));
219237
}
220238

239+
/**
240+
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in
241+
* an array.
242+
* <dl>
243+
* <dt><b>Scheduler:</b></dt>
244+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
245+
* </dl>
246+
* @param <T> the value type
247+
* @param sources the array of SingleSource instances
248+
* @return the new Flowable instance
249+
* @since 2.0
250+
*/
251+
@SuppressWarnings({ "unchecked", "rawtypes" })
252+
public static <T> Flowable<T> concatArray(SingleSource<? extends T>... sources) {
253+
return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
254+
}
255+
221256
/**
222257
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
223258
* <p>

src/main/java/io/reactivex/disposables/Disposables.java

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,6 @@ private Disposables() {
3232
throw new IllegalStateException("No instances!");
3333
}
3434

35-
/**
36-
* Construct a Disposable by wrapping a Runnable that is
37-
* executed exactly once when the Disposable is disposed.
38-
* @param run the Runnable to wrap
39-
* @return the new Disposable instance
40-
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
41-
*/
42-
@Deprecated
43-
public static Disposable from(Runnable run) {
44-
return fromRunnable(run);
45-
}
46-
4735
/**
4836
* Construct a Disposable by wrapping a Runnable that is
4937
* executed exactly once when the Disposable is disposed.
@@ -55,18 +43,6 @@ public static Disposable fromRunnable(Runnable run) {
5543
return new RunnableDisposable(run);
5644
}
5745

58-
/**
59-
* Construct a Disposable by wrapping a Action that is
60-
* executed exactly once when the Disposable is disposed.
61-
* @param run the Action to wrap
62-
* @return the new Disposable instance
63-
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
64-
*/
65-
@Deprecated
66-
public static Disposable from(Action run) {
67-
return fromAction(run);
68-
}
69-
7046
/**
7147
* Construct a Disposable by wrapping a Action that is
7248
* executed exactly once when the Disposable is disposed.
@@ -78,31 +54,6 @@ public static Disposable fromAction(Action run) {
7854
return new ActionDisposable(run);
7955
}
8056

81-
/**
82-
* Construct a Disposable by wrapping a Future that is
83-
* cancelled exactly once when the Disposable is disposed.
84-
* @param future the Future to wrap
85-
* @return the new Disposable instance
86-
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
87-
*/
88-
@Deprecated
89-
public static Disposable from(Future<?> future) {
90-
return fromFuture(future, true);
91-
}
92-
93-
/**
94-
* Construct a Disposable by wrapping a Runnable that is
95-
* executed exactly once when the Disposable is disposed.
96-
* @param future the Runnable to wrap
97-
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
98-
* @return the new Disposable instance
99-
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
100-
*/
101-
@Deprecated
102-
public static Disposable from(Future<?> future, boolean allowInterrupt) {
103-
return fromFuture(future, allowInterrupt);
104-
}
105-
10657
/**
10758
* Construct a Disposable by wrapping a Future that is
10859
* cancelled exactly once when the Disposable is disposed.
@@ -126,18 +77,6 @@ public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
12677
return new FutureDisposable(future, allowInterrupt);
12778
}
12879

129-
/**
130-
* Construct a Disposable by wrapping a Subscription that is
131-
* cancelled exactly once when the Disposable is disposed.
132-
* @param subscription the Runnable to wrap
133-
* @return the new Disposable instance
134-
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
135-
*/
136-
@Deprecated
137-
public static Disposable from(Subscription subscription) {
138-
return fromSubscription(subscription);
139-
}
140-
14180
/**
14281
* Construct a Disposable by wrapping a Subscription that is
14382
* cancelled exactly once when the Disposable is disposed.

src/main/java/io/reactivex/internal/functions/ObjectHelper.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,35 @@ public boolean test(Object o1, Object o2) {
9696
public static <T> BiPredicate<T, T> equalsPredicate() {
9797
return (BiPredicate<T, T>)EQUALS;
9898
}
99+
100+
/**
101+
* Validate that the given value is positive or report an IllegalArgumentException with
102+
* the parameter name.
103+
* @param value the value to validate
104+
* @param paramName the parameter name of the value
105+
* @return value
106+
* @throws IllegalArgumentException if bufferSize &lt;= 0
107+
*/
108+
public static int verifyPositive(int value, String paramName) {
109+
if (value <= 0) {
110+
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
111+
}
112+
return value;
113+
}
114+
115+
/**
116+
* Validate that the given value is positive or report an IllegalArgumentException with
117+
* the parameter name.
118+
* @param value the value to validate
119+
* @param paramName the parameter name of the value
120+
* @return value
121+
* @throws IllegalArgumentException if bufferSize &lt;= 0
122+
*/
123+
public static long verifyPositive(long value, String paramName) {
124+
if (value <= 0L) {
125+
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
126+
}
127+
return value;
128+
}
129+
99130
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.*;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
/**
23+
* Signals the event of the first MaybeSource that signals.
24+
*
25+
* @param <T> the value type emitted
26+
*/
27+
public final class MaybeAmbArray<T> extends Maybe<T> {
28+
29+
final MaybeSource<? extends T>[] sources;
30+
31+
public MaybeAmbArray(MaybeSource<? extends T>[] sources) {
32+
this.sources = sources;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(MaybeObserver<? super T> observer) {
37+
38+
AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
39+
observer.onSubscribe(parent);
40+
41+
for (MaybeSource<? extends T> s : sources) {
42+
if (parent.isDisposed()) {
43+
return;
44+
}
45+
46+
if (s == null) {
47+
parent.onError(new NullPointerException("One of the MaybeSources is null"));
48+
return;
49+
}
50+
51+
s.subscribe(parent);
52+
}
53+
}
54+
55+
static final class AmbMaybeObserver<T>
56+
extends AtomicBoolean
57+
implements MaybeObserver<T>, Disposable {
58+
59+
/** */
60+
private static final long serialVersionUID = -7044685185359438206L;
61+
62+
final MaybeObserver<? super T> actual;
63+
64+
final CompositeDisposable set;
65+
66+
public AmbMaybeObserver(MaybeObserver<? super T> actual) {
67+
this.actual = actual;
68+
this.set = new CompositeDisposable();
69+
}
70+
71+
@Override
72+
public void dispose() {
73+
if (compareAndSet(false, true)) {
74+
set.dispose();
75+
}
76+
}
77+
78+
@Override
79+
public boolean isDisposed() {
80+
return get();
81+
}
82+
83+
@Override
84+
public void onSubscribe(Disposable d) {
85+
set.add(d);
86+
}
87+
88+
@Override
89+
public void onSuccess(T value) {
90+
if (compareAndSet(false, true)) {
91+
set.dispose();
92+
93+
actual.onSuccess(value);
94+
}
95+
}
96+
97+
@Override
98+
public void onError(Throwable e) {
99+
if (compareAndSet(false, true)) {
100+
set.dispose();
101+
102+
actual.onError(e);
103+
} else {
104+
RxJavaPlugins.onError(e);
105+
}
106+
}
107+
108+
@Override
109+
public void onComplete() {
110+
if (compareAndSet(false, true)) {
111+
set.dispose();
112+
113+
actual.onComplete();
114+
}
115+
}
116+
117+
}
118+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.internal.operators.maybe.MaybeAmbArray.AmbMaybeObserver;
18+
19+
/**
20+
* Signals the event of the first MaybeSource that signals.
21+
*
22+
* @param <T> the value type emitted
23+
*/
24+
public final class MaybeAmbIterable<T> extends Maybe<T> {
25+
26+
final Iterable<? extends MaybeSource<? extends T>> sources;
27+
28+
public MaybeAmbIterable(Iterable<? extends MaybeSource<? extends T>> sources) {
29+
this.sources = sources;
30+
}
31+
32+
@Override
33+
protected void subscribeActual(MaybeObserver<? super T> observer) {
34+
AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
35+
observer.onSubscribe(parent);
36+
37+
int i = 0;
38+
for (MaybeSource<? extends T> s : sources) {
39+
if (parent.isDisposed()) {
40+
return;
41+
}
42+
43+
if (s == null) {
44+
parent.onError(new NullPointerException("One of the MaybeSources is null"));
45+
return;
46+
}
47+
48+
s.subscribe(parent);
49+
i++;
50+
}
51+
52+
if (i == 0) {
53+
observer.onComplete();
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)