Skip to content

Commit ecbd27d

Browse files
Merge pull request #2804 from akarnokd/Perf0225
ObserveOn throughput enhancements
2 parents 8758fdd + 94b53d6 commit ecbd27d

File tree

7 files changed

+217
-80
lines changed

7 files changed

+217
-80
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18+
import java.util.Queue;
19+
import java.util.concurrent.atomic.*;
2020

2121
import rx.Observable.Operator;
22-
import rx.Producer;
23-
import rx.Scheduler;
24-
import rx.Subscriber;
25-
import rx.Subscription;
22+
import rx.*;
2623
import rx.exceptions.MissingBackpressureException;
2724
import rx.functions.Action0;
28-
import rx.internal.util.RxRingBuffer;
29-
import rx.schedulers.ImmediateScheduler;
30-
import rx.schedulers.TrampolineScheduler;
25+
import rx.internal.util.*;
26+
import rx.internal.util.unsafe.*;
27+
import rx.schedulers.*;
3128

3229
/**
3330
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
@@ -64,16 +61,15 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
6461
/** Observe through individual queue per observer. */
6562
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
6663
final Subscriber<? super T> child;
67-
private final Scheduler.Worker recursiveScheduler;
68-
private final ScheduledUnsubscribe scheduledUnsubscribe;
64+
final Scheduler.Worker recursiveScheduler;
65+
final ScheduledUnsubscribe scheduledUnsubscribe;
6966
final NotificationLite<T> on = NotificationLite.instance();
7067

71-
private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
72-
private boolean completed = false;
73-
private boolean failure = false;
68+
final Queue<Object> queue;
69+
volatile boolean completed = false;
70+
volatile boolean failure = false;
7471

75-
@SuppressWarnings("unused")
76-
private volatile long requested = 0;
72+
volatile long requested = 0;
7773
@SuppressWarnings("rawtypes")
7874
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
7975

@@ -82,12 +78,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
8278
@SuppressWarnings("rawtypes")
8379
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8480

81+
volatile Throwable error;
82+
8583
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
8684
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
8785
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
8886
this.child = child;
8987
this.recursiveScheduler = scheduler.createWorker();
90-
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
88+
if (UnsafeAccess.isUnsafeAvailable()) {
89+
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
90+
} else {
91+
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
92+
}
93+
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
9194
child.add(scheduledUnsubscribe);
9295
child.setProducer(new Producer() {
9396

@@ -113,10 +116,8 @@ public void onNext(final T t) {
113116
if (isUnsubscribed() || completed) {
114117
return;
115118
}
116-
try {
117-
queue.onNext(t);
118-
} catch (MissingBackpressureException e) {
119-
onError(e);
119+
if (!queue.offer(on.next(t))) {
120+
onError(new MissingBackpressureException());
120121
return;
121122
}
122123
schedule();
@@ -127,8 +128,10 @@ public void onCompleted() {
127128
if (isUnsubscribed() || completed) {
128129
return;
129130
}
131+
if (error != null) {
132+
return;
133+
}
130134
completed = true;
131-
queue.onCompleted();
132135
schedule();
133136
}
134137

@@ -137,53 +140,64 @@ public void onError(final Throwable e) {
137140
if (isUnsubscribed() || completed) {
138141
return;
139142
}
143+
if (error != null) {
144+
return;
145+
}
146+
error = e;
140147
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
141148
unsubscribe();
142-
completed = true;
143149
// mark failure so the polling thread will skip onNext still in the queue
150+
completed = true;
144151
failure = true;
145-
queue.onError(e);
146152
schedule();
147153
}
148154

149-
protected void schedule() {
150-
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
151-
recursiveScheduler.schedule(new Action0() {
155+
final Action0 action = new Action0() {
152156

153-
@Override
154-
public void call() {
155-
pollQueue();
156-
}
157+
@Override
158+
public void call() {
159+
pollQueue();
160+
}
157161

158-
});
162+
};
163+
164+
protected void schedule() {
165+
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
166+
recursiveScheduler.schedule(action);
159167
}
160168
}
161169

162170
// only execute this from schedule()
163-
private void pollQueue() {
171+
void pollQueue() {
164172
int emitted = 0;
165173
do {
166174
/*
167175
* Set to 1 otherwise it could have grown very large while in the last poll loop
168176
* and then we can end up looping all those times again here before exiting even once we've drained
169177
*/
170-
COUNTER_UPDATER.set(this, 1);
178+
counter = 1;
171179

180+
// middle:
172181
while (!scheduledUnsubscribe.isUnsubscribed()) {
173182
if (failure) {
174-
// special handling to short-circuit an error propagation
175-
Object o = queue.poll();
176-
// completed so we will skip onNext if they exist and only emit terminal events
177-
if (on.isError(o)) {
178-
// only emit error
179-
on.accept(child, o);
180-
// we have emitted a terminal event so return (exit the loop we're in)
183+
child.onError(error);
184+
return;
185+
} else {
186+
if (requested == 0 && completed && queue.isEmpty()) {
187+
child.onCompleted();
181188
return;
182189
}
183-
} else {
184190
if (REQUESTED.getAndDecrement(this) != 0) {
185191
Object o = queue.poll();
186192
if (o == null) {
193+
if (completed) {
194+
if (failure) {
195+
child.onError(error);
196+
} else {
197+
child.onCompleted();
198+
}
199+
return;
200+
}
187201
// nothing in queue
188202
REQUESTED.incrementAndGet(this);
189203
break;
@@ -213,12 +227,10 @@ static final class ScheduledUnsubscribe implements Subscription {
213227
final Scheduler.Worker worker;
214228
volatile int once;
215229
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");
216-
final RxRingBuffer queue;
217230
volatile boolean unsubscribed = false;
218231

219-
public ScheduledUnsubscribe(Scheduler.Worker worker, RxRingBuffer queue) {
232+
public ScheduledUnsubscribe(Scheduler.Worker worker) {
220233
this.worker = worker;
221-
this.queue = queue;
222234
}
223235

224236
@Override

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
*/
1616
package rx.internal.schedulers;
1717

18-
import rx.Scheduler;
19-
import rx.Subscription;
20-
import rx.functions.Action0;
21-
import rx.internal.util.RxThreadFactory;
22-
import rx.subscriptions.CompositeSubscription;
23-
import rx.subscriptions.Subscriptions;
18+
import java.util.concurrent.*;
2419

25-
import java.util.concurrent.ThreadFactory;
26-
import java.util.concurrent.TimeUnit;
20+
import rx.*;
21+
import rx.functions.Action0;
22+
import rx.internal.util.*;
23+
import rx.subscriptions.*;
2724

2825
public class EventLoopsScheduler extends Scheduler {
2926
/** Manages a fixed number of workers. */
@@ -95,7 +92,9 @@ public Subscription scheduleDirect(Action0 action) {
9592
}
9693

9794
private static class EventLoopWorker extends Scheduler.Worker {
98-
private final CompositeSubscription innerSubscription = new CompositeSubscription();
95+
private final SubscriptionList serial = new SubscriptionList();
96+
private final CompositeSubscription timed = new CompositeSubscription();
97+
private final SubscriptionList both = new SubscriptionList(serial, timed);
9998
private final PoolWorker poolWorker;
10099

101100
EventLoopWorker(PoolWorker poolWorker) {
@@ -105,28 +104,33 @@ private static class EventLoopWorker extends Scheduler.Worker {
105104

106105
@Override
107106
public void unsubscribe() {
108-
innerSubscription.unsubscribe();
107+
both.unsubscribe();
109108
}
110109

111110
@Override
112111
public boolean isUnsubscribed() {
113-
return innerSubscription.isUnsubscribed();
112+
return both.isUnsubscribed();
114113
}
115114

116115
@Override
117116
public Subscription schedule(Action0 action) {
118-
return schedule(action, 0, null);
117+
if (isUnsubscribed()) {
118+
return Subscriptions.unsubscribed();
119+
}
120+
ScheduledAction s = poolWorker.scheduleActual(action, 0, null);
121+
122+
serial.add(s);
123+
s.addParent(serial);
124+
125+
return s;
119126
}
120127
@Override
121128
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
122-
if (innerSubscription.isUnsubscribed()) {
123-
// don't schedule, we are unsubscribed
129+
if (isUnsubscribed()) {
124130
return Subscriptions.unsubscribed();
125131
}
132+
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed);
126133

127-
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
128-
innerSubscription.add(s);
129-
s.addParent(innerSubscription);
130134
return s;
131135
}
132136
}

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import rx.*;
2424
import rx.exceptions.Exceptions;
2525
import rx.functions.Action0;
26-
import rx.internal.util.RxThreadFactory;
26+
import rx.internal.util.*;
2727
import rx.plugins.*;
28-
import rx.subscriptions.Subscriptions;
28+
import rx.subscriptions.*;
2929

3030
/**
3131
* @warn class description missing
@@ -174,6 +174,37 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
174174

175175
return run;
176176
}
177+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
178+
Action0 decoratedAction = schedulersHook.onSchedule(action);
179+
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
180+
parent.add(run);
181+
182+
Future<?> f;
183+
if (delayTime <= 0) {
184+
f = executor.submit(run);
185+
} else {
186+
f = executor.schedule(run, delayTime, unit);
187+
}
188+
run.add(f);
189+
190+
return run;
191+
}
192+
193+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
194+
Action0 decoratedAction = schedulersHook.onSchedule(action);
195+
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
196+
parent.add(run);
197+
198+
Future<?> f;
199+
if (delayTime <= 0) {
200+
f = executor.submit(run);
201+
} else {
202+
f = executor.schedule(run, delayTime, unit);
203+
}
204+
run.add(f);
205+
206+
return run;
207+
}
177208

178209
@Override
179210
public void unsubscribe() {

0 commit comments

Comments
 (0)