Skip to content

Commit 7561c49

Browse files
author
Aaron Tull
committed
Merge pull request #3550 from stealthcode/release-1.1.0
Public API changes for 1.1.0 release
2 parents a65ca70 + 5f2467f commit 7561c49

28 files changed

+508
-2474
lines changed

src/main/java/rx/Observable.java

Lines changed: 9 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
227227
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
228228
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
229229
*/
230-
@Experimental
230+
@Beta
231231
public Single<T> toSingle() {
232232
return new Single<T>(OnSubscribeSingle.create(this));
233233
}
@@ -1789,9 +1789,8 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
17891789
* @throws IllegalArgumentException
17901790
* if {@code maxConcurrent} is less than or equal to 0
17911791
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1792-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
1792+
* @since 1.1.0
17931793
*/
1794-
@Experimental
17951794
@SuppressWarnings({"unchecked", "rawtypes"})
17961795
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
17971796
if (source.getClass() == ScalarSynchronousObservable.class) {
@@ -2088,9 +2087,8 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
20882087
* the maximum number of Observables that may be subscribed to concurrently
20892088
* @return an Observable that emits all of the items emitted by the Observables in the Array
20902089
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2091-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2090+
* @since 1.1.0
20922091
*/
2093-
@Experimental
20942092
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
20952093
return merge(from(sequences), maxConcurrent);
20962094
}
@@ -4014,9 +4012,8 @@ public void call(Subscriber<? super T> subscriber) {
40144012
* the alternate Observable to subscribe to if the source does not emit any items
40154013
* @return an Observable that emits the items emitted by the source Observable or the items of an
40164014
* alternate Observable if the source Observable is empty.
4017-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
4015+
* @since 1.1.0
40184016
*/
4019-
@Experimental
40204017
public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
40214018
return lift(new OperatorSwitchIfEmpty<T>(alternate));
40224019
}
@@ -5896,9 +5893,8 @@ public final Observable<T> onBackpressureBuffer() {
58965893
*
58975894
* @return the source Observable modified to buffer items up to the given capacity
58985895
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
5899-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5896+
* @since 1.1.0
59005897
*/
5901-
@Beta
59025898
public final Observable<T> onBackpressureBuffer(long capacity) {
59035899
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
59045900
}
@@ -5917,9 +5913,8 @@ public final Observable<T> onBackpressureBuffer(long capacity) {
59175913
*
59185914
* @return the source Observable modified to buffer items up to the given capacity
59195915
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
5920-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5916+
* @since 1.1.0
59215917
*/
5922-
@Beta
59235918
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
59245919
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
59255920
}
@@ -5941,9 +5936,8 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
59415936
* @return the source Observable modified to drop {@code onNext} notifications on overflow
59425937
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
59435938
* @Experimental The behavior of this can change at any time.
5944-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5939+
* @since 1.1.0
59455940
*/
5946-
@Experimental
59475941
public final Observable<T> onBackpressureDrop(Action1<? super T> onDrop) {
59485942
return lift(new OperatorOnBackpressureDrop<T>(onDrop));
59495943
}
@@ -5968,72 +5962,6 @@ public final Observable<T> onBackpressureDrop() {
59685962
return lift(OperatorOnBackpressureDrop.<T>instance());
59695963
}
59705964

5971-
/**
5972-
* Instructs an Observable that is emitting items faster than its observer can consume them to
5973-
* block the producer thread.
5974-
* <p>
5975-
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.block.png" alt="">
5976-
* <p>
5977-
* The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the
5978-
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
5979-
* and doesn't emit more than requested even if more is available. For example, using
5980-
* {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException.
5981-
* <p>
5982-
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5983-
* and doesn't propagate any backpressure requests from downstream.
5984-
* <p>
5985-
* Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to
5986-
* deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by
5987-
* the subscribeOn. In order to avoid this, the operators have to be swapped in the chain:
5988-
* {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow
5989-
* this operator.
5990-
*
5991-
* @param maxQueueLength the maximum number of items the producer can emit without blocking
5992-
* @return the source Observable modified to block {@code onNext} notifications on overflow
5993-
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
5994-
* @Experimental The behavior of this can change at any time.
5995-
* @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to
5996-
* deadlocks. It will be removed/unavailable starting from 1.1.
5997-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5998-
*/
5999-
@Experimental
6000-
@Deprecated
6001-
public final Observable<T> onBackpressureBlock(int maxQueueLength) {
6002-
return lift(new OperatorOnBackpressureBlock<T>(maxQueueLength));
6003-
}
6004-
6005-
/**
6006-
* Instructs an Observable that is emitting items faster than its observer can consume them to block the
6007-
* producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size.
6008-
* <p>
6009-
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.block.png" alt="">
6010-
* <p>
6011-
* The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but
6012-
* the consumer side considers the amount its downstream requested through {@code Producer.request(n)}
6013-
* and doesn't emit more than requested even if available.
6014-
* <p>
6015-
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
6016-
* and doesn't propagate any backpressure requests from downstream.
6017-
* <p>
6018-
* Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to
6019-
* deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by
6020-
* the subscribeOn. In order to avoid this, the operators have to be swapped in the chain:
6021-
* {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow
6022-
* this operator.
6023-
*
6024-
* @return the source Observable modified to block {@code onNext} notifications on overflow
6025-
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
6026-
* @Experimental The behavior of this can change at any time.
6027-
* @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to
6028-
* deadlocks. It will be removed/unavailable starting from 1.1.
6029-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
6030-
*/
6031-
@Experimental
6032-
@Deprecated
6033-
public final Observable<T> onBackpressureBlock() {
6034-
return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE);
6035-
}
6036-
60375965
/**
60385966
* Instructs an Observable that is emitting items faster than its observer can consume them to
60395967
* hold onto the latest value and emit that on request.
@@ -6050,10 +5978,8 @@ public final Observable<T> onBackpressureBlock() {
60505978
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events.
60515979
*
60525980
* @return the source Observable modified so that it emits the most recently-received item upon request
6053-
* @Experimental The behavior of this can change at any time.
6054-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5981+
* @since 1.1.0
60555982
*/
6056-
@Experimental
60575983
public final Observable<T> onBackpressureLatest() {
60585984
return lift(OperatorOnBackpressureLatest.<T>instance());
60595985
}
@@ -8728,9 +8654,8 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
87288654
* condition after each item, and then completes if the condition is satisfied.
87298655
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
87308656
* @see Observable#takeWhile(Func1)
8731-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
8657+
* @since 1.1.0
87328658
*/
8733-
@Experimental
87348659
public final Observable<T> takeUntil(final Func1<? super T, Boolean> stopPredicate) {
87358660
return lift(new OperatorTakeUntilPredicate<T>(stopPredicate));
87368661
}

src/main/java/rx/Single.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@
4040
import rx.internal.operators.OperatorSubscribeOn;
4141
import rx.internal.operators.OperatorTimeout;
4242
import rx.internal.operators.OperatorZip;
43+
44+
import rx.annotations.Beta;
4345
import rx.internal.producers.SingleDelayedProducer;
4446
import rx.singles.BlockingSingle;
4547
import rx.observers.SafeSubscriber;
46-
import rx.plugins.RxJavaObservableExecutionHook;
47-
import rx.plugins.RxJavaPlugins;
48+
import rx.plugins.*;
4849
import rx.schedulers.Schedulers;
4950
import rx.subscriptions.Subscriptions;
5051

@@ -69,7 +70,7 @@
6970
* the type of the item emitted by the Single
7071
* @since (If this class graduates from "Experimental" replace this parenthetical with the release number)
7172
*/
72-
@Experimental
73+
@Beta
7374
public class Single<T> {
7475

7576
final Observable.OnSubscribe<T> onSubscribe;

src/main/java/rx/SingleSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx;
1717

18-
import rx.annotations.Experimental;
18+
import rx.annotations.Beta;
1919
import rx.internal.util.SubscriptionList;
2020

2121
/**
@@ -29,8 +29,9 @@
2929
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
3030
* @param <T>
3131
* the type of item the SingleSubscriber expects to observe
32+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
3233
*/
33-
@Experimental
34+
@Beta
3435
public abstract class SingleSubscriber<T> implements Subscription {
3536

3637
private final SubscriptionList cs = new SubscriptionList();

src/main/java/rx/exceptions/Exceptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ public static Throwable getFinalCause(Throwable e) {
152152
* @param exceptions the collection of exceptions. If null or empty, no exception is thrown.
153153
* If the collection contains a single exception, that exception is either thrown as-is or wrapped into a
154154
* CompositeException. Multiple exceptions are wrapped into a CompositeException.
155-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
155+
* @since 1.1.0
156156
*/
157-
@Experimental
158157
public static void throwIfAny(List<? extends Throwable> exceptions) {
159158
if (exceptions != null && !exceptions.isEmpty()) {
160159
if (exceptions.size() == 1) {

src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java

Lines changed: 0 additions & 95 deletions
This file was deleted.

src/main/java/rx/internal/operators/OperatorTakeUntilPredicate.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import rx.*;
1919
import rx.Observable.Operator;
20-
import rx.annotations.Experimental;
2120
import rx.exceptions.Exceptions;
2221
import rx.functions.Func1;
2322

@@ -26,7 +25,6 @@
2625
* the provided predicate returns false
2726
* <p>
2827
*/
29-
@Experimental
3028
public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
3129
/** Subscriber returned to the upstream. */
3230
private final class ParentSubscriber extends Subscriber<T> {

src/main/java/rx/internal/util/BackpressureDrainManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
/**
2424
* Manages the producer-backpressure-consumer interplay by
2525
* matching up available elements with requested elements and/or
26-
* terminal events.
26+
* terminal events.
27+
*
28+
* @since 1.1.0
2729
*/
2830
@Experimental
2931
public final class BackpressureDrainManager extends AtomicLong implements Producer {

0 commit comments

Comments
 (0)