Skip to content

Commit f245fcd

Browse files
Merge pull request #499 from benjchristensen/observeOn-refactor
ObserveOn Refactor
2 parents 310d530 + 9ff3624 commit f245fcd

File tree

5 files changed

+130
-148
lines changed

5 files changed

+130
-148
lines changed

rxjava-core/src/main/java/rx/Notification.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,16 @@ public boolean isOnNext() {
116116
return getKind() == Kind.OnNext;
117117
}
118118

119+
public void accept(Observer<? super T> observer) {
120+
if (isOnNext()) {
121+
observer.onNext(getValue());
122+
} else if (isOnCompleted()) {
123+
observer.onCompleted();
124+
} else if (isOnError()) {
125+
observer.onError(getThrowable());
126+
}
127+
}
128+
119129
public static enum Kind {
120130
OnNext, OnError, OnCompleted
121131
}

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,20 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import rx.Notification;
1822
import rx.Observable;
1923
import rx.Observable.OnSubscribeFunc;
2024
import rx.Observer;
2125
import rx.Scheduler;
2226
import rx.Subscription;
27+
import rx.concurrency.CurrentThreadScheduler;
2328
import rx.concurrency.ImmediateScheduler;
2429
import rx.subscriptions.CompositeSubscription;
30+
import rx.util.functions.Action0;
31+
import rx.util.functions.Action1;
2532

2633
/**
2734
* Asynchronously notify Observers on the specified Scheduler.
@@ -38,6 +45,9 @@ private static class ObserveOn<T> implements OnSubscribeFunc<T> {
3845
private final Observable<? extends T> source;
3946
private final Scheduler scheduler;
4047

48+
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
49+
final AtomicInteger counter = new AtomicInteger(0);
50+
4151
public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
4252
this.source = source;
4353
this.scheduler = scheduler;
@@ -48,11 +58,55 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
4858
if (scheduler instanceof ImmediateScheduler) {
4959
// do nothing if we request ImmediateScheduler so we don't invoke overhead
5060
return source.subscribe(observer);
61+
} else if (scheduler instanceof CurrentThreadScheduler) {
62+
// do nothing if we request CurrentThreadScheduler so we don't invoke overhead
63+
return source.subscribe(observer);
5164
} else {
52-
CompositeSubscription s = new CompositeSubscription();
53-
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
54-
return s;
65+
return observeOn(observer, scheduler);
5566
}
5667
}
68+
69+
public Subscription observeOn(final Observer<? super T> observer, Scheduler scheduler) {
70+
final CompositeSubscription s = new CompositeSubscription();
71+
72+
s.add(source.materialize().subscribe(new Action1<Notification<? extends T>>() {
73+
74+
@Override
75+
public void call(Notification<? extends T> e) {
76+
// this must happen before 'counter' is used to provide synchronization between threads
77+
queue.offer(e);
78+
79+
// we now use counter to atomically determine if we need to start processing or not
80+
// it will be 0 if it's the first notification or the scheduler has finished processing work
81+
// and we need to start doing it again
82+
if (counter.getAndIncrement() == 0) {
83+
processQueue(s, observer);
84+
}
85+
86+
}
87+
}));
88+
89+
return s;
90+
}
91+
92+
private void processQueue(CompositeSubscription s, final Observer<? super T> observer) {
93+
s.add(scheduler.schedule(new Action1<Action0>() {
94+
@Override
95+
public void call(Action0 self) {
96+
Notification<? extends T> not = queue.poll();
97+
if (not != null) {
98+
not.accept(observer);
99+
}
100+
101+
// decrement count and if we still have work to do
102+
// recursively schedule ourselves to process again
103+
if (counter.decrementAndGet() > 0) {
104+
self.call();
105+
}
106+
107+
}
108+
}));
109+
}
57110
}
111+
58112
}

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 0 additions & 138 deletions
This file was deleted.

rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,17 @@ public void testUnsubscribeOfNewThread() throws InterruptedException {
2828
public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException {
2929
testUnSubscribeForScheduler(Schedulers.threadPoolForIO());
3030
}
31-
31+
3232
@Test
3333
public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException {
3434
testUnSubscribeForScheduler(Schedulers.threadPoolForComputation());
3535
}
36-
36+
37+
@Test
38+
public void testUnsubscribeOfImmediateThread() throws InterruptedException {
39+
testUnSubscribeForScheduler(Schedulers.immediate());
40+
}
41+
3742
@Test
3843
public void testUnsubscribeOfCurrentThread() throws InterruptedException {
3944
testUnSubscribeForScheduler(Schedulers.currentThread());
@@ -56,7 +61,7 @@ public Long call(Long aLong) {
5661
}
5762
})
5863
.subscribeOn(scheduler)
59-
.observeOn(Schedulers.currentThread())
64+
.observeOn(scheduler)
6065
.subscribe(new Observer<Long>() {
6166
@Override
6267
public void onCompleted() {

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
2021
import static rx.operators.OperationObserveOn.*;
2122

@@ -30,6 +31,7 @@
3031
import rx.Observable;
3132
import rx.Observer;
3233
import rx.concurrency.Schedulers;
34+
import rx.util.functions.Action1;
3335

3436
public class OperationObserveOnTest {
3537

@@ -81,4 +83,53 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
8183
inOrder.verify(observer, times(1)).onCompleted();
8284
inOrder.verifyNoMoreInteractions();
8385
}
86+
87+
@Test
88+
@SuppressWarnings("unchecked")
89+
public void testThreadName() throws InterruptedException {
90+
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
91+
92+
Observer<String> observer = mock(Observer.class);
93+
94+
InOrder inOrder = inOrder(observer);
95+
96+
final CountDownLatch completedLatch = new CountDownLatch(1);
97+
doAnswer(new Answer<Void>() {
98+
99+
@Override
100+
public Void answer(InvocationOnMock invocation) throws Throwable {
101+
completedLatch.countDown();
102+
103+
return null;
104+
}
105+
}).when(observer).onCompleted();
106+
107+
doAnswer(new Answer<Void>() {
108+
109+
@Override
110+
public Void answer(InvocationOnMock invocation) throws Throwable {
111+
completedLatch.countDown();
112+
113+
return null;
114+
}
115+
}).when(observer).onError(any(Exception.class));
116+
117+
obs.observeOn(Schedulers.newThread()).doOnEach(new Action1<String>() {
118+
119+
@Override
120+
public void call(String t1) {
121+
String threadName = Thread.currentThread().getName();
122+
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
123+
System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName);
124+
assertTrue(correctThreadName);
125+
}
126+
127+
}).subscribe(observer);
128+
129+
if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
130+
fail("timed out waiting");
131+
}
132+
133+
inOrder.verify(observer, times(1)).onCompleted();
134+
}
84135
}

0 commit comments

Comments
 (0)