Skip to content

Commit 37314b0

Browse files
Merge pull request #1901 from akarnokd/RedoRequestFix
Fixed redo & groupBy backpressure management
2 parents 053e506 + 60656e5 commit 37314b0

File tree

3 files changed

+103
-16
lines changed

3 files changed

+103
-16
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends
190190
@Override
191191
public void call(final Subscriber<? super T> child) {
192192
final AtomicBoolean isLocked = new AtomicBoolean(true);
193-
final AtomicBoolean isStarted = new AtomicBoolean(false);
193+
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
194194
// incremented when requests are made, decremented when requests are fulfilled
195195
final AtomicLong consumerCapacity = new AtomicLong(0l);
196196
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
@@ -300,6 +300,8 @@ public void onNext(Object t) {
300300
if (!isLocked.get() && !child.isUnsubscribed()) {
301301
if (consumerCapacity.get() > 0) {
302302
worker.schedule(subscribeToSource);
303+
} else {
304+
resumeBoundary.compareAndSet(false, true);
303305
}
304306
}
305307
}
@@ -315,22 +317,17 @@ public void setProducer(Producer producer) {
315317
child.setProducer(new Producer() {
316318

317319
@Override
318-
public void request(long n) {
319-
if (isStarted.compareAndSet(false, true)) {
320-
consumerCapacity.set(n);
320+
public void request(final long n) {
321+
long c = consumerCapacity.getAndAdd(n);
322+
Producer producer = currentProducer.get();
323+
if (producer != null) {
324+
producer.request(c + n);
325+
} else
326+
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
321327
worker.schedule(subscribeToSource);
322-
} else {
323-
if (consumerCapacity.getAndAdd(n) == 0) {
324-
// restart
325-
worker.schedule(subscribeToSource);
326-
} else {
327-
if (currentProducer.get() != null) {
328-
currentProducer.get().request(n);
329-
}
330-
}
331328
}
332329
}
333330
});
334-
331+
335332
}
336333
}

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private void pollQueue(GroupState<K, T> groupState) {
306306
}
307307

308308
private void requestMoreIfNecessary() {
309-
if (REQUESTED.get(this) == 0) {
309+
if (REQUESTED.get(this) == 0 && terminated == 0) {
310310
long toRequest = MAX_QUEUE_SIZE - BUFFERED_COUNT.get(this);
311311
if (toRequest > 0 && REQUESTED.compareAndSet(this, 0, toRequest)) {
312312
request(toRequest);

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import rx.functions.Func1;
3939
import rx.functions.Func2;
4040
import rx.internal.util.RxRingBuffer;
41+
import rx.observables.GroupedObservable;
4142
import rx.observers.TestSubscriber;
4243
import rx.schedulers.Schedulers;
4344
import rx.subjects.PublishSubject;
@@ -405,10 +406,14 @@ public static class FuncWithErrors implements Observable.OnSubscribe<String> {
405406
public void call(Subscriber<? super String> o) {
406407
o.onNext("beginningEveryTime");
407408
if (count.getAndIncrement() < numFailures) {
409+
System.out.println("FuncWithErrors @ " + count.get());
408410
o.onError(new RuntimeException("forced failure: " + count.get()));
409411
} else {
412+
System.out.println("FuncWithErrors @ onSuccessOnly");
410413
o.onNext("onSuccessOnly");
414+
System.out.println("FuncWithErrors @ onCompleted");
411415
o.onCompleted();
416+
System.out.println("FuncWithErrors !");
412417
}
413418
}
414419
}
@@ -663,7 +668,7 @@ public void testTimeoutWithRetry() {
663668
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
664669
}
665670

666-
@Test
671+
@Test(timeout = 3000)
667672
public void testRetryWithBackpressure() {
668673
@SuppressWarnings("unchecked")
669674
Observer<String> observer = mock(Observer.class);
@@ -684,5 +689,90 @@ public void testRetryWithBackpressure() {
684689
inOrder.verify(observer, times(1)).onCompleted();
685690
inOrder.verifyNoMoreInteractions();
686691
}
692+
@Test(timeout = 3000)
693+
public void testIssue1900() throws InterruptedException {
694+
@SuppressWarnings("unchecked")
695+
Observer<String> observer = mock(Observer.class);
696+
final int NUM_MSG = 1034;
697+
final AtomicInteger count = new AtomicInteger();
698+
699+
Observable<String> origin = Observable.range(0, NUM_MSG)
700+
.map(new Func1<Integer, String>() {
701+
@Override
702+
public String call(Integer t1) {
703+
return "msg: " + count.incrementAndGet();
704+
}
705+
});
706+
707+
origin.retry()
708+
.groupBy(new Func1<String, String>() {
709+
@Override
710+
public String call(String t1) {
711+
return t1;
712+
}
713+
})
714+
.flatMap(new Func1<GroupedObservable<String,String>, Observable<String>>() {
715+
@Override
716+
public Observable<String> call(GroupedObservable<String, String> t1) {
717+
return t1.take(1);
718+
}
719+
})
720+
.unsafeSubscribe(new TestSubscriber<String>(observer));
721+
722+
InOrder inOrder = inOrder(observer);
723+
// should show 3 attempts
724+
inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class));
725+
// // should have no errors
726+
inOrder.verify(observer, never()).onError(any(Throwable.class));
727+
// should have a single success
728+
//inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
729+
// should have a single successful onCompleted
730+
inOrder.verify(observer, times(1)).onCompleted();
731+
inOrder.verifyNoMoreInteractions();
732+
}
733+
@Test(timeout = 3000)
734+
public void testIssue1900SourceNotSupportingBackpressure() {
735+
@SuppressWarnings("unchecked")
736+
Observer<String> observer = mock(Observer.class);
737+
final int NUM_MSG = 1034;
738+
final AtomicInteger count = new AtomicInteger();
739+
740+
Observable<String> origin = Observable.create(new Observable.OnSubscribe<String>() {
741+
742+
@Override
743+
public void call(Subscriber<? super String> o) {
744+
for(int i=0; i<NUM_MSG; i++) {
745+
o.onNext("msg:" + count.incrementAndGet());
746+
}
747+
o.onCompleted();
748+
}
749+
});
750+
751+
origin.retry()
752+
.groupBy(new Func1<String, String>() {
753+
@Override
754+
public String call(String t1) {
755+
return t1;
756+
}
757+
})
758+
.flatMap(new Func1<GroupedObservable<String,String>, Observable<String>>() {
759+
@Override
760+
public Observable<String> call(GroupedObservable<String, String> t1) {
761+
return t1.take(1);
762+
}
763+
})
764+
.unsafeSubscribe(new TestSubscriber<String>(observer));
765+
766+
InOrder inOrder = inOrder(observer);
767+
// should show 3 attempts
768+
inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class));
769+
// // should have no errors
770+
inOrder.verify(observer, never()).onError(any(Throwable.class));
771+
// should have a single success
772+
//inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
773+
// should have a single successful onCompleted
774+
inOrder.verify(observer, times(1)).onCompleted();
775+
inOrder.verifyNoMoreInteractions();
776+
}
687777

688778
}

0 commit comments

Comments
 (0)