Skip to content

Commit a5d99b7

Browse files
authored
2.x: Optimize ObservableConcatMapCompletable (#5915)
1 parent 4bc516c commit a5d99b7

File tree

3 files changed

+244
-24
lines changed

3 files changed

+244
-24
lines changed

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletable.java

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313

1414
package io.reactivex.internal.operators.mixed;
1515

16+
import java.util.concurrent.Callable;
1617
import java.util.concurrent.atomic.*;
1718

1819
import io.reactivex.*;
1920
import io.reactivex.annotations.Experimental;
2021
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.exceptions.*;
22+
import io.reactivex.exceptions.Exceptions;
2223
import io.reactivex.functions.Function;
23-
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.disposables.*;
2425
import io.reactivex.internal.functions.ObjectHelper;
25-
import io.reactivex.internal.fuseable.SimplePlainQueue;
26+
import io.reactivex.internal.fuseable.*;
2627
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2728
import io.reactivex.internal.util.*;
2829
import io.reactivex.plugins.RxJavaPlugins;
@@ -56,7 +57,9 @@ public ObservableConcatMapCompletable(Observable<T> source,
5657

5758
@Override
5859
protected void subscribeActual(CompletableObserver s) {
59-
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
60+
if (!tryScalarSource(source, mapper, s)) {
61+
source.subscribe(new ConcatMapCompletableObserver<T>(s, mapper, errorMode, prefetch));
62+
}
6063
}
6164

6265
static final class ConcatMapCompletableObserver<T>
@@ -77,7 +80,7 @@ static final class ConcatMapCompletableObserver<T>
7780

7881
final int prefetch;
7982

80-
final SimplePlainQueue<T> queue;
83+
SimpleQueue<T> queue;
8184

8285
Disposable upstream;
8386

@@ -96,20 +99,40 @@ static final class ConcatMapCompletableObserver<T>
9699
this.prefetch = prefetch;
97100
this.errors = new AtomicThrowable();
98101
this.inner = new ConcatMapInnerObserver(this);
99-
this.queue = new SpscLinkedArrayQueue<T>(prefetch);
100102
}
101103

102104
@Override
103105
public void onSubscribe(Disposable s) {
104106
if (DisposableHelper.validate(upstream, s)) {
105107
this.upstream = s;
108+
if (s instanceof QueueDisposable) {
109+
@SuppressWarnings("unchecked")
110+
QueueDisposable<T> qd = (QueueDisposable<T>) s;
111+
112+
int m = qd.requestFusion(QueueDisposable.ANY);
113+
if (m == QueueDisposable.SYNC) {
114+
queue = qd;
115+
done = true;
116+
downstream.onSubscribe(this);
117+
drain();
118+
return;
119+
}
120+
if (m == QueueDisposable.ASYNC) {
121+
queue = qd;
122+
downstream.onSubscribe(this);
123+
return;
124+
}
125+
}
126+
queue = new SpscLinkedArrayQueue<T>(prefetch);
106127
downstream.onSubscribe(this);
107128
}
108129
}
109130

110131
@Override
111132
public void onNext(T t) {
112-
queue.offer(t);
133+
if (t != null) {
134+
queue.offer(t);
135+
}
113136
drain();
114137
}
115138

@@ -187,6 +210,9 @@ void drain() {
187210
return;
188211
}
189212

213+
AtomicThrowable errors = this.errors;
214+
ErrorMode errorMode = this.errorMode;
215+
190216
do {
191217
if (disposed) {
192218
queue.clear();
@@ -206,8 +232,24 @@ void drain() {
206232
}
207233

208234
boolean d = done;
209-
T v = queue.poll();
210-
boolean empty = v == null;
235+
boolean empty = true;
236+
CompletableSource cs = null;
237+
try {
238+
T v = queue.poll();
239+
if (v != null) {
240+
cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource");
241+
empty = false;
242+
}
243+
} catch (Throwable ex) {
244+
Exceptions.throwIfFatal(ex);
245+
disposed = true;
246+
queue.clear();
247+
upstream.dispose();
248+
errors.addThrowable(ex);
249+
ex = errors.terminate();
250+
downstream.onError(ex);
251+
return;
252+
}
211253

212254
if (d && empty) {
213255
disposed = true;
@@ -221,21 +263,6 @@ void drain() {
221263
}
222264

223265
if (!empty) {
224-
225-
CompletableSource cs;
226-
227-
try {
228-
cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource");
229-
} catch (Throwable ex) {
230-
Exceptions.throwIfFatal(ex);
231-
disposed = true;
232-
queue.clear();
233-
upstream.dispose();
234-
errors.addThrowable(ex);
235-
ex = errors.terminate();
236-
downstream.onError(ex);
237-
return;
238-
}
239266
active = true;
240267
cs.subscribe(inner);
241268
}
@@ -274,4 +301,30 @@ void dispose() {
274301
}
275302
}
276303
}
304+
305+
static <T> boolean tryScalarSource(Observable<T> source, Function<? super T, ? extends CompletableSource> mapper, CompletableObserver observer) {
306+
if (source instanceof Callable) {
307+
@SuppressWarnings("unchecked")
308+
Callable<T> call = (Callable<T>) source;
309+
CompletableSource cs = null;
310+
try {
311+
T item = call.call();
312+
if (item != null) {
313+
cs = ObjectHelper.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
314+
}
315+
} catch (Throwable ex) {
316+
Exceptions.throwIfFatal(ex);
317+
EmptyDisposable.error(ex, observer);
318+
return true;
319+
}
320+
321+
if (cs == null) {
322+
EmptyDisposable.complete(observer);
323+
} else {
324+
cs.subscribe(observer);
325+
}
326+
return true;
327+
}
328+
return false;
329+
}
277330
}

src/test/java/io/reactivex/TestHelper.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2827,4 +2827,99 @@ public static <T> void checkInvalidParallelSubscribers(ParallelFlowable<T> sourc
28272827
tss[i].assertFailure(IllegalArgumentException.class);
28282828
}
28292829
}
2830+
2831+
public static <T> Observable<T> rejectObservableFusion() {
2832+
return new Observable<T>() {
2833+
@Override
2834+
protected void subscribeActual(Observer<? super T> observer) {
2835+
observer.onSubscribe(new QueueDisposable<T>() {
2836+
2837+
@Override
2838+
public int requestFusion(int mode) {
2839+
return 0;
2840+
}
2841+
2842+
@Override
2843+
public boolean offer(T value) {
2844+
throw new IllegalStateException();
2845+
}
2846+
2847+
@Override
2848+
public boolean offer(T v1, T v2) {
2849+
throw new IllegalStateException();
2850+
}
2851+
2852+
@Override
2853+
public T poll() throws Exception {
2854+
return null;
2855+
}
2856+
2857+
@Override
2858+
public boolean isEmpty() {
2859+
return true;
2860+
}
2861+
2862+
@Override
2863+
public void clear() {
2864+
}
2865+
2866+
@Override
2867+
public void dispose() {
2868+
}
2869+
2870+
@Override
2871+
public boolean isDisposed() {
2872+
return false;
2873+
}
2874+
});
2875+
}
2876+
};
2877+
}
2878+
2879+
public static <T> Flowable<T> rejectFlowableFusion() {
2880+
return new Flowable<T>() {
2881+
@Override
2882+
protected void subscribeActual(Subscriber<? super T> observer) {
2883+
observer.onSubscribe(new QueueSubscription<T>() {
2884+
2885+
@Override
2886+
public int requestFusion(int mode) {
2887+
return 0;
2888+
}
2889+
2890+
@Override
2891+
public boolean offer(T value) {
2892+
throw new IllegalStateException();
2893+
}
2894+
2895+
@Override
2896+
public boolean offer(T v1, T v2) {
2897+
throw new IllegalStateException();
2898+
}
2899+
2900+
@Override
2901+
public T poll() throws Exception {
2902+
return null;
2903+
}
2904+
2905+
@Override
2906+
public boolean isEmpty() {
2907+
return true;
2908+
}
2909+
2910+
@Override
2911+
public void clear() {
2912+
}
2913+
2914+
@Override
2915+
public void cancel() {
2916+
}
2917+
2918+
@Override
2919+
public void request(long n) {
2920+
}
2921+
});
2922+
}
2923+
};
2924+
}
28302925
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapCompletableTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.exceptions.*;
2525
import io.reactivex.functions.*;
2626
import io.reactivex.internal.functions.Functions;
27+
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
2728
import io.reactivex.observers.TestObserver;
2829
import io.reactivex.plugins.RxJavaPlugins;
2930
import io.reactivex.subjects.*;
@@ -112,6 +113,19 @@ public CompletableSource apply(Integer v) throws Exception {
112113
.assertFailure(TestException.class);
113114
}
114115

116+
@Test
117+
public void mapperCrashHidden() {
118+
Observable.just(1).hide()
119+
.concatMapCompletable(new Function<Integer, CompletableSource>() {
120+
@Override
121+
public CompletableSource apply(Integer v) throws Exception {
122+
throw new TestException();
123+
}
124+
})
125+
.test()
126+
.assertFailure(TestException.class);
127+
}
128+
115129
@Test
116130
public void immediateError() {
117131
PublishSubject<Integer> ps = PublishSubject.create();
@@ -359,4 +373,62 @@ public void doneButNotEmpty() {
359373

360374
to.assertResult();
361375
}
376+
377+
@Test
378+
public void asyncFused() {
379+
final PublishSubject<Integer> ps = PublishSubject.create();
380+
final CompletableSubject cs = CompletableSubject.create();
381+
382+
final TestObserver<Void> to = ps.observeOn(ImmediateThinScheduler.INSTANCE)
383+
.concatMapCompletable(
384+
Functions.justFunction(cs)
385+
)
386+
.test();
387+
388+
ps.onNext(1);
389+
ps.onComplete();
390+
391+
cs.onComplete();
392+
393+
to.assertResult();
394+
}
395+
396+
@Test
397+
public void fusionRejected() {
398+
final CompletableSubject cs = CompletableSubject.create();
399+
400+
TestHelper.rejectObservableFusion()
401+
.concatMapCompletable(
402+
Functions.justFunction(cs)
403+
)
404+
.test()
405+
.assertEmpty();
406+
}
407+
408+
@Test
409+
public void emptyScalarSource() {
410+
final CompletableSubject cs = CompletableSubject.create();
411+
412+
Observable.empty()
413+
.concatMapCompletable(Functions.justFunction(cs))
414+
.test()
415+
.assertResult();
416+
}
417+
418+
@Test
419+
public void justScalarSource() {
420+
final CompletableSubject cs = CompletableSubject.create();
421+
422+
TestObserver<Void> to = Observable.just(1)
423+
.concatMapCompletable(Functions.justFunction(cs))
424+
.test();
425+
426+
to.assertEmpty();
427+
428+
assertTrue(cs.hasObservers());
429+
430+
cs.onComplete();
431+
432+
to.assertResult();
433+
}
362434
}

0 commit comments

Comments
 (0)