Skip to content

Commit 054ba58

Browse files
committed
Merge pull request #3138 from akarnokd/RangePerf
Range overhead reduction.
2 parents 6362dfe + 9643d94 commit 054ba58

File tree

3 files changed

+92
-56
lines changed

3 files changed

+92
-56
lines changed

src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 68 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18+
import java.util.concurrent.atomic.AtomicLong;
1919

20+
import rx.*;
2021
import rx.Observable.OnSubscribe;
21-
import rx.Producer;
22-
import rx.Subscriber;
2322

2423
/**
2524
* Emit ints from start to end inclusive.
@@ -39,13 +38,13 @@ public void call(final Subscriber<? super Integer> o) {
3938
o.setProducer(new RangeProducer(o, start, end));
4039
}
4140

42-
private static final class RangeProducer implements Producer {
41+
private static final class RangeProducer extends AtomicLong implements Producer {
42+
/** */
43+
private static final long serialVersionUID = 4114392207069098388L;
44+
4345
private final Subscriber<? super Integer> o;
44-
// accessed by REQUESTED_UPDATER
45-
private volatile long requested;
46-
private static final AtomicLongFieldUpdater<RangeProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(RangeProducer.class, "requested");
47-
private long index;
4846
private final int end;
47+
private long index;
4948

5049
private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5150
this.o = o;
@@ -55,54 +54,79 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5554

5655
@Override
5756
public void request(long n) {
58-
if (requested == Long.MAX_VALUE) {
57+
if (get() == Long.MAX_VALUE) {
5958
// already started with fast-path
6059
return;
6160
}
62-
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
61+
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
6362
// fast-path without backpressure
64-
for (long i = index; i <= end; i++) {
63+
fastpath();
64+
} else if (n > 0L) {
65+
long c = BackpressureUtils.getAndAddRequest(this, n);
66+
if (c == 0L) {
67+
// backpressure is requested
68+
slowpath(n);
69+
}
70+
}
71+
}
72+
73+
/**
74+
*
75+
*/
76+
void slowpath(long r) {
77+
long idx = index;
78+
while (true) {
79+
/*
80+
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
81+
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
82+
*/
83+
long fs = end - idx + 1;
84+
long e = Math.min(fs, r);
85+
final boolean complete = fs <= r;
86+
87+
fs = e + idx;
88+
final Subscriber<? super Integer> o = this.o;
89+
90+
for (long i = idx; i != fs; i++) {
6591
if (o.isUnsubscribed()) {
6692
return;
6793
}
6894
o.onNext((int) i);
6995
}
70-
if (!o.isUnsubscribed()) {
96+
97+
if (complete) {
98+
if (o.isUnsubscribed()) {
99+
return;
100+
}
71101
o.onCompleted();
102+
return;
72103
}
73-
} else if (n > 0) {
74-
// backpressure is requested
75-
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER,this, n);
76-
if (_c == 0) {
77-
while (true) {
78-
/*
79-
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
80-
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
81-
*/
82-
long r = requested;
83-
long idx = index;
84-
long numLeft = end - idx + 1;
85-
long e = Math.min(numLeft, r);
86-
boolean completeOnFinish = numLeft <= r;
87-
long stopAt = e + idx;
88-
for (long i = idx; i < stopAt; i++) {
89-
if (o.isUnsubscribed()) {
90-
return;
91-
}
92-
o.onNext((int) i);
93-
}
94-
index = stopAt;
95-
96-
if (completeOnFinish) {
97-
o.onCompleted();
98-
return;
99-
}
100-
if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
101-
// we're done emitting the number requested so return
102-
return;
103-
}
104-
}
104+
105+
idx = fs;
106+
index = fs;
107+
108+
r = addAndGet(-e);
109+
if (r == 0L) {
110+
// we're done emitting the number requested so return
111+
return;
112+
}
113+
}
114+
}
115+
116+
/**
117+
*
118+
*/
119+
void fastpath() {
120+
final long end = this.end + 1L;
121+
final Subscriber<? super Integer> o = this.o;
122+
for (long i = index; i != end; i++) {
123+
if (o.isUnsubscribed()) {
124+
return;
105125
}
126+
o.onNext((int) i);
127+
}
128+
if (!o.isUnsubscribed()) {
129+
o.onCompleted();
106130
}
107131
}
108132
}

src/perf/java/rx/operators/OperatorRangePerf.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,11 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20-
import org.openjdk.jmh.annotations.Benchmark;
21-
import org.openjdk.jmh.annotations.BenchmarkMode;
22-
import org.openjdk.jmh.annotations.Mode;
23-
import org.openjdk.jmh.annotations.OutputTimeUnit;
24-
import org.openjdk.jmh.annotations.Param;
25-
import org.openjdk.jmh.annotations.Scope;
26-
import org.openjdk.jmh.annotations.Setup;
27-
import org.openjdk.jmh.annotations.State;
20+
import org.openjdk.jmh.annotations.*;
2821
import org.openjdk.jmh.infra.Blackhole;
2922

30-
import rx.Observable;
31-
import rx.Subscriber;
23+
import rx.*;
24+
import rx.internal.operators.OnSubscribeRange;
3225

3326
@BenchmarkMode(Mode.Throughput)
3427
@OutputTimeUnit(TimeUnit.SECONDS)
@@ -50,7 +43,7 @@ public static class InputUsingRequest {
5043

5144
@Setup
5245
public void setup(final Blackhole bh) {
53-
observable = Observable.range(0, size);
46+
observable = Observable.create(new OnSubscribeRange(0, size));
5447
this.bh = bh;
5548
}
5649

@@ -98,7 +91,7 @@ public static class InputWithoutRequest {
9891

9992
@Setup
10093
public void setup(final Blackhole bh) {
101-
observable = Observable.range(0, size);
94+
observable = Observable.create(new OnSubscribeRange(0, size));
10295
this.bh = bh;
10396

10497
}

src/test/java/rx/internal/operators/OnSubscribeRangeTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,23 @@ public void onNext(Integer t) {
249249
}});
250250
assertTrue(completed.get());
251251
}
252+
253+
@Test(timeout = 1000)
254+
public void testNearMaxValueWithoutBackpressure() {
255+
TestSubscriber<Integer> ts = TestSubscriber.create();
256+
Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts);
257+
258+
ts.assertCompleted();
259+
ts.assertNoErrors();
260+
ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE);
261+
}
262+
@Test(timeout = 1000)
263+
public void testNearMaxValueWithBackpressure() {
264+
TestSubscriber<Integer> ts = TestSubscriber.create(3);
265+
Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts);
266+
267+
ts.assertCompleted();
268+
ts.assertNoErrors();
269+
ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE);
270+
}
252271
}

0 commit comments

Comments
 (0)