Skip to content

Commit 290602e

Browse files
author
Steven Wu
committed
issue-2764: add new operator onBackpressureDrop(Action1 onDrop)
1 parent c41b37d commit 290602e

File tree

3 files changed

+118
-6
lines changed

3 files changed

+118
-6
lines changed

src/main/java/rx/Observable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5258,6 +5258,27 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
52585258
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
52595259
}
52605260

5261+
/**
5262+
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
5263+
* rather than emit, those items that its observer is not prepared to observe.
5264+
* <p>
5265+
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
5266+
* <p>
5267+
* If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until
5268+
* the observer invokes {@code request(n)} again to increase the request count.
5269+
* <dl>
5270+
* <dt><b>Scheduler:</b></dt>
5271+
* <dd>{@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.</dd>
5272+
* </dl>
5273+
*
5274+
* @param onDrop the action to invoke for each item dropped. onDrop action should be fast and should never block.
5275+
* @return the source Observable modified to drop {@code onNext} notifications on overflow
5276+
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
5277+
*/
5278+
public final Observable<T> onBackpressureDrop(Action1<? super T> onDrop) {
5279+
return lift(new OperatorOnBackpressureDrop<T>(onDrop));
5280+
}
5281+
52615282
/**
52625283
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
52635284
* rather than emit, those items that its observer is not prepared to observe.

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,34 @@
2020
import rx.Observable.Operator;
2121
import rx.Producer;
2222
import rx.Subscriber;
23+
import rx.functions.Action1;
2324

2425
public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {
26+
2527
/** Lazy initialization via inner-class holder. */
2628
private static final class Holder {
2729
/** A singleton instance. */
2830
static final OperatorOnBackpressureDrop<Object> INSTANCE = new OperatorOnBackpressureDrop<Object>();
2931
}
32+
3033
/**
3134
* @return a singleton instance of this stateless operator.
3235
*/
3336
@SuppressWarnings({ "unchecked" })
3437
public static <T> OperatorOnBackpressureDrop<T> instance() {
3538
return (OperatorOnBackpressureDrop<T>)Holder.INSTANCE;
3639
}
37-
private OperatorOnBackpressureDrop() { }
40+
41+
private final Action1<? super T> onDrop;
42+
43+
private OperatorOnBackpressureDrop() {
44+
this(null);
45+
}
46+
47+
public OperatorOnBackpressureDrop(Action1<? super T> onDrop) {
48+
this.onDrop = onDrop;
49+
}
50+
3851
@Override
3952
public Subscriber<? super T> call(final Subscriber<? super T> child) {
4053
final AtomicLong requested = new AtomicLong();
@@ -68,6 +81,11 @@ public void onNext(T t) {
6881
if (requested.get() > 0) {
6982
child.onNext(t);
7083
requested.decrementAndGet();
84+
} else {
85+
// item dropped
86+
if(onDrop != null) {
87+
onDrop.call(t);
88+
}
7189
}
7290
}
7391

src/test/java/rx/BackpressureTests.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.junit.*;
2525

26+
import org.junit.rules.TestName;
2627
import rx.Observable.OnSubscribe;
2728
import rx.exceptions.MissingBackpressureException;
2829
import rx.functions.*;
@@ -33,6 +34,9 @@
3334

3435
public class BackpressureTests {
3536

37+
@Rule
38+
public TestName testName = new TestName();
39+
3640
@After
3741
public void doAfterTest() {
3842
TestObstructionDetection.checkObstruction();
@@ -424,18 +428,56 @@ public void testOnBackpressureDrop() {
424428
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
425429
ts.awaitTerminalEvent();
426430
ts.assertNoErrors();
427-
428-
431+
429432
List<Integer> onNextEvents = ts.getOnNextEvents();
430433
assertEquals(NUM, onNextEvents.size());
431434

432435
Integer lastEvent = onNextEvents.get(NUM - 1);
433-
436+
434437
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
435438
// it drop, so we should get some number far higher than what would have sequentially incremented
436439
assertTrue(NUM - 1 <= lastEvent.intValue());
437440
}
438441
}
442+
443+
@Test(timeout = 10000)
444+
public void testOnBackpressureDropWithAction() {
445+
for (int i = 0; i < 100; i++) {
446+
final AtomicInteger emitCount = new AtomicInteger();
447+
final AtomicInteger dropCount = new AtomicInteger();
448+
final AtomicInteger passCount = new AtomicInteger();
449+
final int NUM = (int) (RxRingBuffer.SIZE * 1.5); // > 1 so that take doesn't prevent buffer overflow
450+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
451+
firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
452+
@Override
453+
public void call(Integer i) {
454+
dropCount.incrementAndGet();
455+
}
456+
})
457+
.doOnNext(new Action1<Integer>() {
458+
@Override
459+
public void call(Integer integer) {
460+
passCount.incrementAndGet();
461+
}
462+
})
463+
.observeOn(Schedulers.computation())
464+
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
465+
ts.awaitTerminalEvent();
466+
ts.assertNoErrors();
467+
468+
List<Integer> onNextEvents = ts.getOnNextEvents();
469+
Integer lastEvent = onNextEvents.get(NUM - 1);
470+
System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + " Emitted: " + emitCount.get() + " Last value: " + lastEvent);
471+
assertEquals(NUM, onNextEvents.size());
472+
// in reality, NUM < passCount
473+
assertTrue(NUM <= passCount.get());
474+
// it drop, so we should get some number far higher than what would have sequentially incremented
475+
assertTrue(NUM - 1 <= lastEvent.intValue());
476+
assertTrue(0 < dropCount.get());
477+
assertEquals(emitCount.get(), passCount.get() + dropCount.get());
478+
}
479+
}
480+
439481
@Test(timeout = 10000)
440482
public void testOnBackpressureDropSynchronous() {
441483
for (int i = 0; i < 100; i++) {
@@ -446,18 +488,49 @@ public void testOnBackpressureDropSynchronous() {
446488
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
447489
ts.awaitTerminalEvent();
448490
ts.assertNoErrors();
449-
491+
450492
List<Integer> onNextEvents = ts.getOnNextEvents();
451493
assertEquals(NUM, onNextEvents.size());
452494

453495
Integer lastEvent = onNextEvents.get(NUM - 1);
454-
496+
455497
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
456498
// it drop, so we should get some number far higher than what would have sequentially incremented
457499
assertTrue(NUM - 1 <= lastEvent.intValue());
458500
}
459501
}
460502

503+
@Test(timeout = 10000)
504+
public void testOnBackpressureDropSynchronousWithAction() {
505+
for (int i = 0; i < 100; i++) {
506+
final AtomicInteger dropCount = new AtomicInteger();
507+
int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
508+
AtomicInteger c = new AtomicInteger();
509+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
510+
firehose(c).onBackpressureDrop(new Action1<Integer>() {
511+
@Override
512+
public void call(Integer i) {
513+
dropCount.incrementAndGet();
514+
}
515+
})
516+
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
517+
ts.awaitTerminalEvent();
518+
ts.assertNoErrors();
519+
520+
List<Integer> onNextEvents = ts.getOnNextEvents();
521+
assertEquals(NUM, onNextEvents.size());
522+
523+
Integer lastEvent = onNextEvents.get(NUM - 1);
524+
525+
System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent);
526+
// it drop, so we should get some number far higher than what would have sequentially incremented
527+
assertTrue(NUM - 1 <= lastEvent.intValue());
528+
// no drop in synchronous mode
529+
assertEquals(0, dropCount.get());
530+
assertEquals(c.get(), onNextEvents.size());
531+
}
532+
}
533+
461534
@Test(timeout = 2000)
462535
public void testOnBackpressureBuffer() {
463536
int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow

0 commit comments

Comments
 (0)