Skip to content

Commit f59ce00

Browse files
authored
2.x: coverage, bugfixes, 9/03-1 (#4468)
1 parent b15df98 commit f59ce00

File tree

91 files changed

+3425
-560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+3425
-560
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,15 +1331,15 @@ public final Completable startWith(CompletableSource other) {
13311331
}
13321332

13331333
/**
1334-
* Returns an NbpObservable which first delivers the events
1335-
* of the other NbpObservable then runs this CompletableConsumable.
1334+
* Returns an Observable which first delivers the events
1335+
* of the other Observable then runs this CompletableConsumable.
13361336
* <dl>
13371337
* <dt><b>Scheduler:</b></dt>
13381338
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
13391339
* </dl>
13401340
* @param <T> the value type
1341-
* @param other the other NbpObservable to run first
1342-
* @return the new NbpObservable instance
1341+
* @param other the other Observable to run first
1342+
* @return the new Observable instance
13431343
* @throws NullPointerException if other is null
13441344
*/
13451345
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1641,14 +1641,14 @@ public final <T> Maybe<T> toMaybe() {
16411641
}
16421642

16431643
/**
1644-
* Returns an NbpObservable which when subscribed to subscribes to this Completable and
1644+
* Returns an Observable which when subscribed to subscribes to this Completable and
16451645
* relays the terminal events to the subscriber.
16461646
* <dl>
16471647
* <dt><b>Scheduler:</b></dt>
16481648
* <dd>{@code toObservable} does not operate by default on a particular {@link Scheduler}.</dd>
16491649
* </dl>
16501650
* @param <T> the value type
1651-
* @return the new NbpObservable created
1651+
* @return the new Observable created
16521652
*/
16531653
@SchedulerSupport(SchedulerSupport.NONE)
16541654
public final <T> Observable<T> toObservable() {

src/main/java/io/reactivex/Flowable.java

Lines changed: 164 additions & 56 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[]
269269
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
270270
Function<? super T[], ? extends R> combiner, int bufferSize) {
271271
ObjectHelper.requireNonNull(sources, "sources is null");
272+
if (sources.length == 0) {
273+
return empty();
274+
}
272275
ObjectHelper.requireNonNull(combiner, "combiner is null");
273276
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
274277

@@ -954,7 +957,7 @@ public static <T> Observable<T> concat(
954957
* Note: named this way because of overload conflict with concat(ObservableSource&lt;ObservableSource&gt;)
955958
* @param sources the array of sources
956959
* @param <T> the common base value type
957-
* @return the new NbpObservable instance
960+
* @return the new Observable instance
958961
* @throws NullPointerException if sources is null
959962
*/
960963
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -1832,9 +1835,16 @@ public static Observable<Long> intervalRange(long start, long count, long initia
18321835
*/
18331836
@SchedulerSupport(SchedulerSupport.CUSTOM)
18341837
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
1835-
1838+
if (count < 0) {
1839+
throw new IllegalArgumentException("count >= 0 required but it was " + count);
1840+
}
1841+
1842+
if (count == 0L) {
1843+
return Observable.<Long>empty().delay(initialDelay, unit, scheduler);
1844+
}
1845+
18361846
long end = start + (count - 1);
1837-
if (end < 0) {
1847+
if (start > 0 && end < 0) {
18381848
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
18391849
}
18401850
ObjectHelper.requireNonNull(unit, "unit is null");
@@ -9818,11 +9828,25 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
98189828

98199829
@Override
98209830
public final void subscribe(Observer<? super T> observer) {
9821-
ObjectHelper.requireNonNull(observer, "observer is null");
9822-
9823-
observer = RxJavaPlugins.onSubscribe(this, observer);
9824-
9825-
subscribeActual(observer);
9831+
ObjectHelper.requireNonNull(observer, "s is null");
9832+
try {
9833+
observer = RxJavaPlugins.onSubscribe(this, observer);
9834+
9835+
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
9836+
9837+
subscribeActual(observer);
9838+
} catch (NullPointerException e) { // NOPMD
9839+
throw e;
9840+
} catch (Throwable e) {
9841+
Exceptions.throwIfFatal(e);
9842+
// can't call onError because no way to know if a Disposable has been set or not
9843+
// can't call onSubscribe because the call might have set a Subscription already
9844+
RxJavaPlugins.onError(e);
9845+
9846+
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
9847+
npe.initCause(e);
9848+
throw npe;
9849+
}
98269850
}
98279851

98289852
/**
@@ -10151,7 +10175,7 @@ public final Observable<T> takeFirst(Predicate<? super T> predicate) {
1015110175
@SchedulerSupport(SchedulerSupport.NONE)
1015210176
public final Observable<T> takeLast(int count) {
1015310177
if (count < 0) {
10154-
throw new IndexOutOfBoundsException("count >= required but it was " + count);
10178+
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
1015510179
} else
1015610180
if (count == 0) {
1015710181
return ignoreElements();

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ private Functions() {
3131
}
3232

3333
@SuppressWarnings("unchecked")
34-
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> biFunction) {
35-
ObjectHelper.requireNonNull(biFunction, "biFunction is null");
34+
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
35+
ObjectHelper.requireNonNull(f, "f is null");
3636
return new Function<Object[], R>() {
3737
@Override
3838
public R apply(Object[] a) throws Exception {
3939
if (a.length != 2) {
4040
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
4141
}
42-
return ((BiFunction<Object, Object, R>)biFunction).apply(a[0], a[1]);
42+
return ((BiFunction<Object, Object, R>)f).apply(a[0], a[1]);
4343
}
4444
};
4545
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ static final class PublisherBufferExactSubscriber<T, C extends Collection<? supe
7777
Subscription s;
7878

7979
boolean done;
80+
81+
int index;
8082

8183
public PublisherBufferExactSubscriber(Subscriber<? super C> actual, int size, Callable<C> bufferSupplier) {
8284
this.actual = actual;
@@ -133,10 +135,14 @@ public void onNext(T t) {
133135
}
134136

135137
b.add(t);
136-
137-
if (b.size() == size) {
138+
139+
int i = index + 1;
140+
if (i == size) {
141+
index = 0;
138142
buffer = null;
139143
actual.onNext(b);
144+
} else {
145+
index = i;
140146
}
141147
}
142148

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public final class FlowableCombineLatest<T, R>
4040

4141
final Iterable<? extends Publisher<? extends T>> iterable;
4242

43-
final Function<Object[], ? extends R> combiner;
43+
final Function<? super T[], ? extends R> combiner;
4444

4545
final int bufferSize;
4646

4747
final boolean delayErrors;
4848

4949
public FlowableCombineLatest(Publisher<? extends T>[] array,
50-
Function<Object[], ? extends R> combiner,
50+
Function<? super T[], ? extends R> combiner,
5151
int bufferSize, boolean delayErrors) {
5252
if (bufferSize <= 0) {
5353
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
@@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
6161
}
6262

6363
public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
64-
Function<Object[], ? extends R> combiner,
64+
Function<? super T[], ? extends R> combiner,
6565
int bufferSize, boolean delayErrors) {
6666
if (bufferSize <= 0) {
6767
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
@@ -150,7 +150,7 @@ public void subscribeActual(Subscriber<? super R> s) {
150150
new FlowableMap<T, R>((Publisher<T>)a[0], new Function<T, R>() {
151151
@Override
152152
public R apply(T t) throws Exception {
153-
return combiner.apply(new Object[] { t });
153+
return combiner.apply((T[])new Object[] { t });
154154
}
155155
}).subscribe(s);
156156
return;
@@ -173,7 +173,7 @@ static final class CombineLatestCoordinator<T, R>
173173

174174
final Subscriber<? super R> actual;
175175

176-
final Function<Object[], ? extends R> combiner;
176+
final Function<? super T[], ? extends R> combiner;
177177

178178
final CombineLatestInnerSubscriber<T>[] subscribers;
179179

@@ -198,7 +198,7 @@ static final class CombineLatestCoordinator<T, R>
198198
final AtomicReference<Throwable> error;
199199

200200
public CombineLatestCoordinator(Subscriber<? super R> actual,
201-
Function<Object[], ? extends R> combiner, int n,
201+
Function<? super T[], ? extends R> combiner, int n,
202202
int bufferSize, boolean delayErrors) {
203203
this.actual = actual;
204204
this.combiner = combiner;

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

Lines changed: 72 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -273,87 +273,88 @@ public void drain() {
273273

274274
if (inner != null) {
275275
SimpleQueue<R> q = inner.queue();
276-
277-
while (e != r) {
278-
if (cancelled) {
279-
cancelAll();
280-
return;
281-
}
282-
283-
if (em == ErrorMode.IMMEDIATE) {
284-
Throwable ex = error.get();
285-
if (ex != null) {
286-
current = null;
287-
inner.cancel();
276+
if (q != null) {
277+
while (e != r) {
278+
if (cancelled) {
288279
cancelAll();
289-
290-
a.onError(ex);
291280
return;
292281
}
293-
}
294-
295-
boolean d = inner.isDone();
296-
297-
R v;
298-
299-
try {
300-
v = q.poll();
301-
} catch (Throwable ex) {
302-
Exceptions.throwIfFatal(ex);
303-
current = null;
304-
inner.cancel();
305-
cancelAll();
306-
a.onError(ex);
307-
return;
308-
}
309-
310-
boolean empty = v == null;
311-
312-
if (d && empty) {
313-
inner = null;
314-
current = null;
315-
s.request(1);
316-
continue outer;
317-
}
318-
319-
if (empty) {
320-
break;
321-
}
322-
323-
a.onNext(v);
324-
325-
e++;
326-
327-
inner.requestOne();
328-
}
329-
330-
if (e == r) {
331-
if (cancelled) {
332-
cancelAll();
333-
return;
334-
}
335-
336-
if (em == ErrorMode.IMMEDIATE) {
337-
Throwable ex = error.get();
338-
if (ex != null) {
282+
283+
if (em == ErrorMode.IMMEDIATE) {
284+
Throwable ex = error.get();
285+
if (ex != null) {
286+
current = null;
287+
inner.cancel();
288+
cancelAll();
289+
290+
a.onError(ex);
291+
return;
292+
}
293+
}
294+
295+
boolean d = inner.isDone();
296+
297+
R v;
298+
299+
try {
300+
v = q.poll();
301+
} catch (Throwable ex) {
302+
Exceptions.throwIfFatal(ex);
339303
current = null;
340304
inner.cancel();
341305
cancelAll();
342-
343306
a.onError(ex);
344307
return;
345308
}
309+
310+
boolean empty = v == null;
311+
312+
if (d && empty) {
313+
inner = null;
314+
current = null;
315+
s.request(1);
316+
continue outer;
317+
}
318+
319+
if (empty) {
320+
break;
321+
}
322+
323+
a.onNext(v);
324+
325+
e++;
326+
327+
inner.requestOne();
346328
}
347-
348-
boolean d = inner.isDone();
349-
350-
boolean empty = inner.queue().isEmpty();
351-
352-
if (d && empty) {
353-
inner = null;
354-
current = null;
355-
s.request(1);
356-
continue;
329+
330+
if (e == r) {
331+
if (cancelled) {
332+
cancelAll();
333+
return;
334+
}
335+
336+
if (em == ErrorMode.IMMEDIATE) {
337+
Throwable ex = error.get();
338+
if (ex != null) {
339+
current = null;
340+
inner.cancel();
341+
cancelAll();
342+
343+
a.onError(ex);
344+
return;
345+
}
346+
}
347+
348+
boolean d = inner.isDone();
349+
350+
boolean empty = q.isEmpty();
351+
352+
if (d && empty) {
353+
inner = null;
354+
current = null;
355+
s.request(1);
356+
continue;
357+
}
357358
}
358359
}
359360
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ void drain() {
179179
e++;
180180
}
181181

182+
if (e == r) {
183+
boolean d = done;
184+
boolean empty = q.isEmpty();
185+
186+
if (checkTerminated(d, empty, a)) {
187+
return;
188+
}
189+
}
190+
182191
if (e != 0L) {
183192
if (r != Long.MAX_VALUE) {
184193
requested.addAndGet(-e);

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
180180
* @param unit the unit of measure of the age amount
181181
* @param scheduler the target scheduler providing the current time
182182
* @param bufferSize the maximum number of elements to hold
183-
* @return the new NbpConnectableObservable instance
183+
* @return the new ConnectableFlowable instance
184184
*/
185185
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
186186
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {

0 commit comments

Comments
 (0)