Skip to content

Commit a18b8c1

Browse files
ToObservableIterable Recursion/Loop
- the ImmediateScheduler no longer schedules itself but uses a loop - 10-20x faster to use a loop rather than schedule itself recursively
1 parent 2e53b67 commit a18b8c1

File tree

5 files changed

+162
-150
lines changed

5 files changed

+162
-150
lines changed

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observer;
2020
import rx.Scheduler;
2121
import rx.Subscription;
22+
import rx.schedulers.ImmediateScheduler;
2223
import rx.schedulers.Schedulers;
2324
import rx.subscriptions.Subscriptions;
2425
import rx.util.functions.Action0;
@@ -37,16 +38,20 @@
3738
public final class OperationToObservableIterable<T> {
3839

3940
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
40-
return new ToObservableIterable<T>(list, scheduler);
41+
if (scheduler instanceof ImmediateScheduler) {
42+
return new ToObservableIterable<T>(list);
43+
} else {
44+
return new ToObservableIterableScheduled<T>(list, scheduler);
45+
}
4146
}
4247

4348
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
44-
return toObservableIterable(list, Schedulers.immediate());
49+
return new ToObservableIterable<T>(list);
4550
}
4651

47-
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
52+
private static class ToObservableIterableScheduled<T> implements OnSubscribeFunc<T> {
4853

49-
public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
54+
public ToObservableIterableScheduled(Iterable<? extends T> list, Scheduler scheduler) {
5055
this.iterable = list;
5156
this.scheduler = scheduler;
5257
}
@@ -74,4 +79,25 @@ public void call(Action0 self) {
7479
});
7580
}
7681
}
82+
83+
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
84+
85+
public ToObservableIterable(Iterable<? extends T> list) {
86+
this.iterable = list;
87+
}
88+
89+
final Iterable<? extends T> iterable;
90+
91+
public Subscription onSubscribe(final Observer<? super T> observer) {
92+
try {
93+
for (T t : iterable) {
94+
observer.onNext(t);
95+
}
96+
observer.onCompleted();
97+
} catch (Exception e) {
98+
observer.onError(e);
99+
}
100+
return Subscriptions.empty();
101+
}
102+
}
77103
}

rxjava-core/src/test/java/rx/ObserveOnTests.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.junit.Test;
2728
import org.mockito.InOrder;
@@ -34,6 +35,7 @@
3435
import rx.schedulers.TestScheduler;
3536
import rx.util.functions.Action0;
3637
import rx.util.functions.Action1;
38+
import rx.util.functions.Func1;
3739

3840
public class OperationObserveOnTest {
3941

@@ -210,6 +212,108 @@ public void observeSameOnMultipleSchedulers() {
210212
inOrder2.verify(observer2, times(1)).onCompleted();
211213
verify(observer2, never()).onError(any(Throwable.class));
212214
inOrder2.verifyNoMoreInteractions();
215+
}
216+
217+
/**
218+
* Confirm that running on a NewThreadScheduler uses the same thread for the entire stream
219+
*/
220+
@Test
221+
public void testObserveOnWithNewThreadScheduler() {
222+
final AtomicInteger count = new AtomicInteger();
223+
final int _multiple = 99;
224+
225+
Observable.range(1, 100000).map(new Func1<Integer, Integer>() {
226+
227+
@Override
228+
public Integer call(Integer t1) {
229+
return t1 * _multiple;
230+
}
231+
232+
}).observeOn(Schedulers.newThread())
233+
.toBlockingObservable().forEach(new Action1<Integer>() {
234+
235+
@Override
236+
public void call(Integer t1) {
237+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
238+
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
239+
}
240+
241+
});
242+
}
243+
244+
/**
245+
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
246+
*/
247+
@Test
248+
public void testObserveOnWithThreadPoolScheduler() {
249+
final AtomicInteger count = new AtomicInteger();
250+
final int _multiple = 99;
251+
252+
Observable.range(1, 100000).map(new Func1<Integer, Integer>() {
253+
254+
@Override
255+
public Integer call(Integer t1) {
256+
return t1 * _multiple;
257+
}
258+
259+
}).observeOn(Schedulers.computation())
260+
.toBlockingObservable().forEach(new Action1<Integer>() {
261+
262+
@Override
263+
public void call(Integer t1) {
264+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
265+
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
266+
}
267+
268+
});
269+
}
270+
271+
/**
272+
* Attempts to confirm that when pauses exist between events, the ScheduledObserver
273+
* does not lose or reorder any events since the scheduler will not block, but will
274+
* be re-scheduled when it receives new events after each pause.
275+
*
276+
*
277+
* This is non-deterministic in proving success, but if it ever fails (non-deterministically)
278+
* it is a sign of potential issues as thread-races and scheduling should not affect output.
279+
*/
280+
@Test
281+
public void testObserveOnOrderingConcurrency() {
282+
final AtomicInteger count = new AtomicInteger();
283+
final int _multiple = 99;
284+
285+
Observable.range(1, 10000).map(new Func1<Integer, Integer>() {
286+
287+
@Override
288+
public Integer call(Integer t1) {
289+
if (randomIntFrom0to100() > 98) {
290+
try {
291+
Thread.sleep(2);
292+
} catch (InterruptedException e) {
293+
e.printStackTrace();
294+
}
295+
}
296+
return t1 * _multiple;
297+
}
298+
299+
}).observeOn(Schedulers.computation())
300+
.toBlockingObservable().forEach(new Action1<Integer>() {
301+
302+
@Override
303+
public void call(Integer t1) {
304+
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
305+
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
306+
}
307+
308+
});
309+
}
213310

311+
private static int randomIntFrom0to100() {
312+
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
313+
long x = System.nanoTime();
314+
x ^= (x << 21);
315+
x ^= (x >>> 35);
316+
x ^= (x << 4);
317+
return Math.abs((int) x % 100);
214318
}
215319
}

rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import rx.Observable;
2828
import rx.Observer;
29+
import rx.schedulers.Schedulers;
2930

3031
public class OperationToObservableIterableTest {
3132

@@ -42,4 +43,18 @@ public void testIterable() {
4243
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
4344
verify(aObserver, times(1)).onCompleted();
4445
}
46+
47+
@Test
48+
public void testIterableScheduled() {
49+
Observable<String> observable = Observable.create(toObservableIterable(Arrays.<String> asList("one", "two", "three"), Schedulers.currentThread()));
50+
51+
@SuppressWarnings("unchecked")
52+
Observer<String> aObserver = mock(Observer.class);
53+
observable.subscribe(aObserver);
54+
verify(aObserver, times(1)).onNext("one");
55+
verify(aObserver, times(1)).onNext("two");
56+
verify(aObserver, times(1)).onNext("three");
57+
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
58+
verify(aObserver, times(1)).onCompleted();
59+
}
4560
}

rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ public static void main(String args[]) {
3535

3636
@Override
3737
public void call() {
38-
// spt.singleResponse(Schedulers.immediate());
38+
spt.singleResponse(Schedulers.immediate());
3939
// spt.singleResponse(Schedulers.currentThread());
4040
// spt.singleResponse(Schedulers.threadPoolForComputation());
4141

42-
spt.arrayResponse(Schedulers.immediate());
43-
// spt.arrayResponse(Schedulers.currentThread());
42+
// spt.arrayResponse(Schedulers.immediate());
43+
// spt.arrayResponse(Schedulers.currentThread());
4444
// spt.arrayResponse(Schedulers.threadPoolForComputation());
4545
}
4646
});
@@ -92,11 +92,11 @@ public long baseline() {
9292
*
9393
* --- Schedulers.immediate() ---
9494
*
95-
* Run: 10 - 4,113,672 ops/sec
96-
* Run: 11 - 4,068,351 ops/sec
97-
* Run: 12 - 4,070,318 ops/sec
98-
* Run: 13 - 4,161,793 ops/sec
99-
* Run: 14 - 4,156,725 ops/sec
95+
* Run: 10 - 14,973,870 ops/sec
96+
* Run: 11 - 15,345,142 ops/sec
97+
* Run: 12 - 14,962,533 ops/sec
98+
* Run: 13 - 14,793,030 ops/sec
99+
* Run: 14 - 15,177,685 ops/sec
100100
*
101101
* --- Schedulers.currentThread() ---
102102
*
@@ -127,11 +127,11 @@ public long singleResponse(Scheduler scheduler) {
127127
*
128128
* --- Schedulers.immediate() ---
129129
*
130-
* Run: 0 - 1,849,947 ops/sec
131-
* Run: 1 - 2,076,067 ops/sec
132-
* Run: 2 - 2,114,688 ops/sec
133-
* Run: 3 - 2,114,301 ops/sec
134-
* Run: 4 - 2,102,543 ops/sec
130+
* Run: 10 - 9,805,017 ops/sec
131+
* Run: 11 - 9,880,427 ops/sec
132+
* Run: 12 - 9,615,809 ops/sec
133+
* Run: 13 - 10,920,297 ops/sec
134+
* Run: 14 - 10,822,721 ops/sec
135135
*
136136
* --- Schedulers.currentThread() ---
137137
*

0 commit comments

Comments
 (0)