Skip to content

Commit 5a10454

Browse files
Merge pull request #727 from benjchristensen/iterable-to-observable-immediate-scheduler
ImmediateScheduler optimization for toObservableIterable
2 parents 22b6b3d + a18b8c1 commit 5a10454

File tree

6 files changed

+179
-152
lines changed

6 files changed

+179
-152
lines changed

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observer;
2020
import rx.Scheduler;
2121
import rx.Subscription;
22+
import rx.schedulers.ImmediateScheduler;
2223
import rx.schedulers.Schedulers;
2324
import rx.subscriptions.Subscriptions;
2425
import rx.util.functions.Action0;
@@ -37,16 +38,20 @@
3738
public final class OperationToObservableIterable<T> {
3839

3940
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
40-
return new ToObservableIterable<T>(list, scheduler);
41+
if (scheduler instanceof ImmediateScheduler) {
42+
return new ToObservableIterable<T>(list);
43+
} else {
44+
return new ToObservableIterableScheduled<T>(list, scheduler);
45+
}
4146
}
4247

4348
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
44-
return toObservableIterable(list, Schedulers.immediate());
49+
return new ToObservableIterable<T>(list);
4550
}
4651

47-
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
52+
private static class ToObservableIterableScheduled<T> implements OnSubscribeFunc<T> {
4853

49-
public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
54+
public ToObservableIterableScheduled(Iterable<? extends T> list, Scheduler scheduler) {
5055
this.iterable = list;
5156
this.scheduler = scheduler;
5257
}
@@ -74,4 +79,25 @@ public void call(Action0 self) {
7479
});
7580
}
7681
}
82+
83+
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
84+
85+
public ToObservableIterable(Iterable<? extends T> list) {
86+
this.iterable = list;
87+
}
88+
89+
final Iterable<? extends T> iterable;
90+
91+
public Subscription onSubscribe(final Observer<? super T> observer) {
92+
try {
93+
for (T t : iterable) {
94+
observer.onNext(t);
95+
}
96+
observer.onCompleted();
97+
} catch (Exception e) {
98+
observer.onError(e);
99+
}
100+
return Subscriptions.empty();
101+
}
102+
}
77103
}

rxjava-core/src/test/java/rx/ObserveOnTests.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.junit.Test;
2728
import org.mockito.InOrder;
@@ -34,6 +35,7 @@
3435
import rx.schedulers.TestScheduler;
3536
import rx.util.functions.Action0;
3637
import rx.util.functions.Action1;
38+
import rx.util.functions.Func1;
3739

3840
public class OperationObserveOnTest {
3941

@@ -210,6 +212,108 @@ public void observeSameOnMultipleSchedulers() {
210212
inOrder2.verify(observer2, times(1)).onCompleted();
211213
verify(observer2, never()).onError(any(Throwable.class));
212214
inOrder2.verifyNoMoreInteractions();
215+
}
216+
217+
/**
218+
* Confirm that running on a NewThreadScheduler uses the same thread for the entire stream
219+
*/
220+
@Test
221+
public void testObserveOnWithNewThreadScheduler() {
222+
final AtomicInteger count = new AtomicInteger();
223+
final int _multiple = 99;
224+
225+
Observable.range(1, 100000).map(new Func1<Integer, Integer>() {
226+
227+
@Override
228+
public Integer call(Integer t1) {
229+
return t1 * _multiple;
230+
}
231+
232+
}).observeOn(Schedulers.newThread())
233+
.toBlockingObservable().forEach(new Action1<Integer>() {
234+
235+
@Override
236+
public void call(Integer t1) {
237+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
238+
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
239+
}
240+
241+
});
242+
}
243+
244+
/**
245+
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
246+
*/
247+
@Test
248+
public void testObserveOnWithThreadPoolScheduler() {
249+
final AtomicInteger count = new AtomicInteger();
250+
final int _multiple = 99;
251+
252+
Observable.range(1, 100000).map(new Func1<Integer, Integer>() {
253+
254+
@Override
255+
public Integer call(Integer t1) {
256+
return t1 * _multiple;
257+
}
258+
259+
}).observeOn(Schedulers.computation())
260+
.toBlockingObservable().forEach(new Action1<Integer>() {
261+
262+
@Override
263+
public void call(Integer t1) {
264+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
265+
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
266+
}
267+
268+
});
269+
}
270+
271+
/**
272+
* Attempts to confirm that when pauses exist between events, the ScheduledObserver
273+
* does not lose or reorder any events since the scheduler will not block, but will
274+
* be re-scheduled when it receives new events after each pause.
275+
*
276+
*
277+
* This is non-deterministic in proving success, but if it ever fails (non-deterministically)
278+
* it is a sign of potential issues as thread-races and scheduling should not affect output.
279+
*/
280+
@Test
281+
public void testObserveOnOrderingConcurrency() {
282+
final AtomicInteger count = new AtomicInteger();
283+
final int _multiple = 99;
284+
285+
Observable.range(1, 10000).map(new Func1<Integer, Integer>() {
286+
287+
@Override
288+
public Integer call(Integer t1) {
289+
if (randomIntFrom0to100() > 98) {
290+
try {
291+
Thread.sleep(2);
292+
} catch (InterruptedException e) {
293+
e.printStackTrace();
294+
}
295+
}
296+
return t1 * _multiple;
297+
}
298+
299+
}).observeOn(Schedulers.computation())
300+
.toBlockingObservable().forEach(new Action1<Integer>() {
301+
302+
@Override
303+
public void call(Integer t1) {
304+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
305+
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
306+
}
307+
308+
});
309+
}
213310

311+
private static int randomIntFrom0to100() {
312+
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
313+
long x = System.nanoTime();
314+
x ^= (x << 21);
315+
x ^= (x >>> 35);
316+
x ^= (x << 4);
317+
return Math.abs((int) x % 100);
214318
}
215319
}

rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import rx.Observable;
2828
import rx.Observer;
29+
import rx.schedulers.Schedulers;
2930

3031
public class OperationToObservableIterableTest {
3132

@@ -42,4 +43,18 @@ public void testIterable() {
4243
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
4344
verify(aObserver, times(1)).onCompleted();
4445
}
46+
47+
@Test
48+
public void testIterableScheduled() {
49+
Observable<String> observable = Observable.create(toObservableIterable(Arrays.<String> asList("one", "two", "three"), Schedulers.currentThread()));
50+
51+
@SuppressWarnings("unchecked")
52+
Observer<String> aObserver = mock(Observer.class);
53+
observable.subscribe(aObserver);
54+
verify(aObserver, times(1)).onNext("one");
55+
verify(aObserver, times(1)).onNext("two");
56+
verify(aObserver, times(1)).onNext("three");
57+
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
58+
verify(aObserver, times(1)).onCompleted();
59+
}
4560
}

rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,31 @@ public String call(String s) {
205205
assertTrue(strings.contains("names=>b-2"));
206206
}
207207

208+
/**
209+
* The order of execution is nondeterministic.
210+
* @throws InterruptedException
211+
*/
208212
@SuppressWarnings("rawtypes")
209213
@Test
210214
public final void testSequenceOfActions() throws InterruptedException {
211215
final Scheduler scheduler = getScheduler();
212216

213-
final CountDownLatch latch = new CountDownLatch(1);
217+
final CountDownLatch latch = new CountDownLatch(2);
214218
final Action0 first = mock(Action0.class);
215219
final Action0 second = mock(Action0.class);
216220

217-
// make it wait until after the second is called
221+
// make it wait until both the first and second are called
222+
doAnswer(new Answer() {
223+
224+
@Override
225+
public Object answer(InvocationOnMock invocation) throws Throwable {
226+
try {
227+
return invocation.getMock();
228+
} finally {
229+
latch.countDown();
230+
}
231+
}
232+
}).when(first).call();
218233
doAnswer(new Answer() {
219234

220235
@Override

0 commit comments

Comments
 (0)