Skip to content

Commit e25ab24

Browse files
authored
2.x: Improve the scalar source perf of Obs.(concat|switch)MapX (#5918)
1 parent a5d99b7 commit e25ab24

15 files changed

+420
-43
lines changed

src/main/java/io/reactivex/internal/operators/maybe/MaybeToObservable.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.maybe;
1515

1616
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
1718
import io.reactivex.disposables.Disposable;
1819
import io.reactivex.internal.disposables.DisposableHelper;
1920
import io.reactivex.internal.fuseable.HasUpstreamMaybeSource;
@@ -40,17 +41,29 @@ public MaybeSource<T> source() {
4041

4142
@Override
4243
protected void subscribeActual(Observer<? super T> s) {
43-
source.subscribe(new MaybeToFlowableSubscriber<T>(s));
44+
source.subscribe(create(s));
4445
}
4546

46-
static final class MaybeToFlowableSubscriber<T> extends DeferredScalarDisposable<T>
47+
/**
48+
* Creates a {@link MaybeObserver} wrapper around a {@link Observer}.
49+
* @param <T> the value type
50+
* @param downstream the downstream {@code Observer} to talk to
51+
* @return the new MaybeObserver instance
52+
* @since 2.1.11 - experimental
53+
*/
54+
@Experimental
55+
public static <T> MaybeObserver<T> create(Observer<? super T> downstream) {
56+
return new MaybeToObservableObserver<T>(downstream);
57+
}
58+
59+
static final class MaybeToObservableObserver<T> extends DeferredScalarDisposable<T>
4760
implements MaybeObserver<T> {
4861

4962
private static final long serialVersionUID = 7603343402964826922L;
5063

5164
Disposable d;
5265

53-
MaybeToFlowableSubscriber(Observer<? super T> actual) {
66+
MaybeToObservableObserver(Observer<? super T> actual) {
5467
super(actual);
5568
}
5669

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313

1414
package io.reactivex.internal.operators.mixed;
1515

16-
import java.util.concurrent.Callable;
1716
import java.util.concurrent.atomic.*;
1817

1918
import io.reactivex.*;
2019
import io.reactivex.annotations.Experimental;
2120
import io.reactivex.disposables.Disposable;
2221
import io.reactivex.exceptions.Exceptions;
2322
import io.reactivex.functions.Function;
24-
import io.reactivex.internal.disposables.*;
23+
import io.reactivex.internal.disposables.DisposableHelper;
2524
import io.reactivex.internal.functions.ObjectHelper;
2625
import io.reactivex.internal.fuseable.*;
2726
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
@@ -57,7 +56,7 @@ public ObservableConcatMapCompletable(Observable<T> source,
5756

5857
@Override
5958
protected void subscribeActual(CompletableObserver s) {
60-
if (!tryScalarSource(source, mapper, s)) {
59+
if (!ScalarXMapZHelper.tryAsCompletable(source, mapper, s)) {
6160
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
6261
}
6362
}
@@ -301,30 +300,4 @@ void dispose() {
301300
}
302301
}
303302
}
304-
305-
static <T> boolean tryScalarSource(Observable<T> source, Function<? super T, ? extends CompletableSource> mapper, CompletableObserver observer) {
306-
if (source instanceof Callable) {
307-
@SuppressWarnings("unchecked")
308-
Callable<T> call = (Callable<T>) source;
309-
CompletableSource cs = null;
310-
try {
311-
T item = call.call();
312-
if (item != null) {
313-
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
314-
}
315-
} catch (Throwable ex) {
316-
Exceptions.throwIfFatal(ex);
317-
EmptyDisposable.error(ex, observer);
318-
return true;
319-
}
320-
321-
if (cs == null) {
322-
EmptyDisposable.complete(observer);
323-
} else {
324-
cs.subscribe(observer);
325-
}
326-
return true;
327-
}
328-
return false;
329-
}
330303
}

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ public ObservableConcatMapMaybe(Observable<T> source,
5959

6060
@Override
6161
protected void subscribeActual(Observer<? super R> s) {
62-
source.subscribe(new ConcatMapMaybeMainObserver<T, R>(s, mapper, prefetch, errorMode));
62+
if (!ScalarXMapZHelper.tryAsMaybe(source, mapper, s)) {
63+
source.subscribe(new ConcatMapMaybeMainObserver<T, R>(s, mapper, prefetch, errorMode));
64+
}
6365
}
6466

6567
static final class ConcatMapMaybeMainObserver<T, R>

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ public ObservableConcatMapSingle(Observable<T> source,
5959

6060
@Override
6161
protected void subscribeActual(Observer<? super R> s) {
62-
source.subscribe(new ConcatMapSingleMainObserver<T, R>(s, mapper, prefetch, errorMode));
62+
if (!ScalarXMapZHelper.tryAsSingle(source, mapper, s)) {
63+
source.subscribe(new ConcatMapSingleMainObserver<T, R>(s, mapper, prefetch, errorMode));
64+
}
6365
}
6466

6567
static final class ConcatMapSingleMainObserver<T, R>

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public ObservableSwitchMapCompletable(Observable<T> source,
5151

5252
@Override
5353
protected void subscribeActual(CompletableObserver s) {
54-
source.subscribe(new SwitchMapCompletableObserver<T>(s, mapper, delayErrors));
54+
if (!ScalarXMapZHelper.tryAsCompletable(source, mapper, s)) {
55+
source.subscribe(new SwitchMapCompletableObserver<T>(s, mapper, delayErrors));
56+
}
5557
}
5658

5759
static final class SwitchMapCompletableObserver<T> implements Observer<T>, Disposable {

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public ObservableSwitchMapMaybe(Observable<T> source,
5353

5454
@Override
5555
protected void subscribeActual(Observer<? super R> s) {
56-
source.subscribe(new SwitchMapMaybeMainObserver<T, R>(s, mapper, delayErrors));
56+
if (!ScalarXMapZHelper.tryAsMaybe(source, mapper, s)) {
57+
source.subscribe(new SwitchMapMaybeMainObserver<T, R>(s, mapper, delayErrors));
58+
}
5759
}
5860

5961
static final class SwitchMapMaybeMainObserver<T, R> extends AtomicInteger

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public ObservableSwitchMapSingle(Observable<T> source,
5353

5454
@Override
5555
protected void subscribeActual(Observer<? super R> s) {
56-
source.subscribe(new SwitchMapSingleMainObserver<T, R>(s, mapper, delayErrors));
56+
if (!ScalarXMapZHelper.tryAsSingle(source, mapper, s)) {
57+
source.subscribe(new SwitchMapSingleMainObserver<T, R>(s, mapper, delayErrors));
58+
}
5759
}
5860

5961
static final class SwitchMapSingleMainObserver<T, R> extends AtomicInteger
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.mixed;
15+
16+
import java.util.concurrent.Callable;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.Function;
22+
import io.reactivex.internal.disposables.EmptyDisposable;
23+
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.internal.operators.maybe.MaybeToObservable;
25+
import io.reactivex.internal.operators.single.SingleToObservable;
26+
27+
/**
28+
* Utility class to extract a value from a scalar source reactive type,
29+
* map it to a 0-1 type then subscribe the output type's consumer to it,
30+
* saving on the overhead of the regular subscription channel.
31+
* @since 2.1.11 - experimental
32+
*/
33+
@Experimental
34+
final class ScalarXMapZHelper {
35+
36+
private ScalarXMapZHelper() {
37+
throw new IllegalStateException("No instances!");
38+
}
39+
40+
/**
41+
* Try subscribing to a {@link CompletableSource} mapped from
42+
* a scalar source (which implements {@link Callable}).
43+
* @param <T> the upstream value type
44+
* @param source the source reactive type ({@code Flowable} or {@code Observable})
45+
* possibly implementing {@link Callable}.
46+
* @param mapper the function that turns the scalar upstream value into a
47+
* {@link CompletableSource}
48+
* @param observer the consumer to subscribe to the mapped {@link CompletableSource}
49+
* @return true if a subscription did happen and the regular path should be skipped
50+
*/
51+
static <T> boolean tryAsCompletable(Object source,
52+
Function<? super T, ? extends CompletableSource> mapper,
53+
CompletableObserver observer) {
54+
if (source instanceof Callable) {
55+
@SuppressWarnings("unchecked")
56+
Callable<T> call = (Callable<T>) source;
57+
CompletableSource cs = null;
58+
try {
59+
T item = call.call();
60+
if (item != null) {
61+
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
62+
}
63+
} catch (Throwable ex) {
64+
Exceptions.throwIfFatal(ex);
65+
EmptyDisposable.error(ex, observer);
66+
return true;
67+
}
68+
69+
if (cs == null) {
70+
EmptyDisposable.complete(observer);
71+
} else {
72+
cs.subscribe(observer);
73+
}
74+
return true;
75+
}
76+
return false;
77+
}
78+
79+
/**
80+
* Try subscribing to a {@link MaybeSource} mapped from
81+
* a scalar source (which implements {@link Callable}).
82+
* @param <T> the upstream value type
83+
* @param source the source reactive type ({@code Flowable} or {@code Observable})
84+
* possibly implementing {@link Callable}.
85+
* @param mapper the function that turns the scalar upstream value into a
86+
* {@link MaybeSource}
87+
* @param observer the consumer to subscribe to the mapped {@link MaybeSource}
88+
* @return true if a subscription did happen and the regular path should be skipped
89+
*/
90+
static <T, R> boolean tryAsMaybe(Object source,
91+
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
92+
Observer<? super R> observer) {
93+
if (source instanceof Callable) {
94+
@SuppressWarnings("unchecked")
95+
Callable<T> call = (Callable<T>) source;
96+
MaybeSource<? extends R> cs = null;
97+
try {
98+
T item = call.call();
99+
if (item != null) {
100+
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null MaybeSource");
101+
}
102+
} catch (Throwable ex) {
103+
Exceptions.throwIfFatal(ex);
104+
EmptyDisposable.error(ex, observer);
105+
return true;
106+
}
107+
108+
if (cs == null) {
109+
EmptyDisposable.complete(observer);
110+
} else {
111+
cs.subscribe(MaybeToObservable.create(observer));
112+
}
113+
return true;
114+
}
115+
return false;
116+
}
117+
118+
/**
119+
* Try subscribing to a {@link SingleSource} mapped from
120+
* a scalar source (which implements {@link Callable}).
121+
* @param <T> the upstream value type
122+
* @param source the source reactive type ({@code Flowable} or {@code Observable})
123+
* possibly implementing {@link Callable}.
124+
* @param mapper the function that turns the scalar upstream value into a
125+
* {@link SingleSource}
126+
* @param observer the consumer to subscribe to the mapped {@link SingleSource}
127+
* @return true if a subscription did happen and the regular path should be skipped
128+
*/
129+
static <T, R> boolean tryAsSingle(Object source,
130+
Function<? super T, ? extends SingleSource<? extends R>> mapper,
131+
Observer<? super R> observer) {
132+
if (source instanceof Callable) {
133+
@SuppressWarnings("unchecked")
134+
Callable<T> call = (Callable<T>) source;
135+
SingleSource<? extends R> cs = null;
136+
try {
137+
T item = call.call();
138+
if (item != null) {
139+
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null SingleSource");
140+
}
141+
} catch (Throwable ex) {
142+
Exceptions.throwIfFatal(ex);
143+
EmptyDisposable.error(ex, observer);
144+
return true;
145+
}
146+
147+
if (cs == null) {
148+
EmptyDisposable.complete(observer);
149+
} else {
150+
cs.subscribe(SingleToObservable.create(observer));
151+
}
152+
return true;
153+
}
154+
return false;
155+
}
156+
}

src/main/java/io/reactivex/internal/operators/single/SingleToObservable.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package io.reactivex.internal.operators.single;
1414

1515
import io.reactivex.*;
16+
import io.reactivex.annotations.Experimental;
1617
import io.reactivex.disposables.Disposable;
1718
import io.reactivex.internal.disposables.DisposableHelper;
1819

@@ -31,7 +32,19 @@ public SingleToObservable(SingleSource<? extends T> source) {
3132

3233
@Override
3334
public void subscribeActual(final Observer<? super T> s) {
34-
source.subscribe(new SingleToObservableObserver<T>(s));
35+
source.subscribe(create(s));
36+
}
37+
38+
/**
39+
* Creates a {@link SingleObserver} wrapper around a {@link Observer}.
40+
* @param <T> the value type
41+
* @param downstream the downstream {@code Observer} to talk to
42+
* @return the new SingleObserver instance
43+
* @since 2.1.11 - experimental
44+
*/
45+
@Experimental
46+
public static <T> SingleObserver<T> create(Observer<? super T> downstream) {
47+
return new SingleToObservableObserver<T>(downstream);
3548
}
3649

3750
static final class SingleToObservableObserver<T>

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,37 @@ public MaybeSource<? extends Object> apply(Integer v)
340340
assertFalse(ps.hasObservers());
341341
}
342342

343+
@Test
344+
public void scalarMapperCrash() {
345+
TestObserver<Object> to = Observable.just(1)
346+
.concatMapMaybe(new Function<Integer, MaybeSource<? extends Object>>() {
347+
@Override
348+
public MaybeSource<? extends Object> apply(Integer v)
349+
throws Exception {
350+
throw new TestException();
351+
}
352+
})
353+
.test();
354+
355+
to.assertFailure(TestException.class);
356+
}
357+
343358
@Test
344359
public void disposed() {
345-
TestHelper.checkDisposed(Observable.just(1)
360+
TestHelper.checkDisposed(Observable.just(1).hide()
346361
.concatMapMaybe(Functions.justFunction(Maybe.never()))
347362
);
348363
}
364+
365+
@Test
366+
public void scalarEmptySource() {
367+
MaybeSubject<Integer> ms = MaybeSubject.create();
368+
369+
Observable.empty()
370+
.concatMapMaybe(Functions.justFunction(ms))
371+
.test()
372+
.assertResult();
373+
374+
assertFalse(ms.hasObservers());
375+
}
349376
}

0 commit comments

Comments
 (0)