Skip to content

Commit dc7a3f8

Browse files
Scheduler overload with recursive support
1 parent 0b3a6f5 commit dc7a3f8

File tree

2 files changed

+82
-22
lines changed

2 files changed

+82
-22
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import org.mockito.Mockito;
2828

2929
import rx.concurrency.TestScheduler;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.MultipleAssignmentSubscription;
3032
import rx.subscriptions.Subscriptions;
3133
import rx.util.functions.Action0;
34+
import rx.util.functions.Action1;
3235
import rx.util.functions.Func1;
3336
import rx.util.functions.Func2;
3437

@@ -83,23 +86,23 @@ public abstract class Scheduler {
8386
* Schedules a cancelable action to be executed periodically.
8487
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
8588
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
86-
*
87-
* @param state
89+
*
90+
* @param state
8891
* State to pass into the action.
89-
* @param action
92+
* @param action
9093
* The action to execute periodically.
91-
* @param initialDelay
94+
* @param initialDelay
9295
* Time to wait before executing the action for the first time.
93-
* @param period
96+
* @param period
9497
* The time interval to wait each time in between executing the action.
95-
* @param unit
98+
* @param unit
9699
* The time unit the interval above is given in.
97100
* @return A subscription to be able to unsubscribe from action.
98101
*/
99102
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
100103
final long periodInNanos = unit.toNanos(period);
101104
final AtomicBoolean complete = new AtomicBoolean();
102-
105+
103106
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
104107
@Override
105108
public Subscription call(Scheduler scheduler, T state0) {
@@ -128,7 +131,7 @@ public void call() {
128131
}
129132
});
130133
}
131-
134+
132135
/**
133136
* Schedules a cancelable action to be executed at dueTime.
134137
*
@@ -150,6 +153,40 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
150153
}
151154
}
152155

156+
/**
157+
* Schedules an action and receives back an action for recursive execution.
158+
*
159+
* @param action
160+
* action
161+
* @return a subscription to be able to unsubscribe from action.
162+
*/
163+
public Subscription schedule(final Action1<Action0> action) {
164+
final CompositeSubscription parentSubscription = new CompositeSubscription();
165+
final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
166+
parentSubscription.add(childSubscription);
167+
168+
final Func2<Scheduler, Func2, Subscription> parentAction = new Func2<Scheduler, Func2, Subscription>() {
169+
170+
@Override
171+
public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
172+
action.call(new Action0() {
173+
174+
@Override
175+
public void call() {
176+
if (!parentSubscription.isUnsubscribed()) {
177+
childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction));
178+
}
179+
}
180+
181+
});
182+
return childSubscription;
183+
}
184+
};
185+
186+
parentSubscription.add(schedule(parentAction, parentAction));
187+
188+
return parentSubscription;
189+
}
153190

154191
/**
155192
* Schedules an action to be executed.
@@ -187,17 +224,16 @@ public Subscription call(Scheduler scheduler, Void state) {
187224
}, delayTime, unit);
188225
}
189226

190-
191227
/**
192228
* Schedules an action to be executed periodically.
193229
*
194-
* @param action
230+
* @param action
195231
* The action to execute periodically.
196-
* @param initialDelay
232+
* @param initialDelay
197233
* Time to wait before executing the action for the first time.
198-
* @param period
234+
* @param period
199235
* The time interval to wait each time in between executing the action.
200-
* @param unit
236+
* @param unit
201237
* The time unit the interval above is given in.
202238
* @return A subscription to be able to unsubscribe from action.
203239
*/
@@ -230,39 +266,41 @@ public int degreeOfParallelism() {
230266
}
231267

232268
public static class UnitTest {
233-
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
269+
@SuppressWarnings("unchecked")
270+
// mocking is unchecked, unfortunately
234271
@Test
235272
public void testPeriodicScheduling() {
236273
final Func1<Long, Void> calledOp = mock(Func1.class);
237-
274+
238275
final TestScheduler scheduler = new TestScheduler();
239276
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
240-
@Override public void call() {
277+
@Override
278+
public void call() {
241279
System.out.println(scheduler.now());
242280
calledOp.call(scheduler.now());
243281
}
244282
}, 1, 2, TimeUnit.SECONDS);
245-
283+
246284
verify(calledOp, never()).call(anyLong());
247285

248286
InOrder inOrder = Mockito.inOrder(calledOp);
249-
287+
250288
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
251289
inOrder.verify(calledOp, never()).call(anyLong());
252290

253291
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
254292
inOrder.verify(calledOp, times(1)).call(1000L);
255-
293+
256294
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
257295
inOrder.verify(calledOp, never()).call(3000L);
258-
296+
259297
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
260298
inOrder.verify(calledOp, times(1)).call(3000L);
261-
299+
262300
scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
263301
inOrder.verify(calledOp, times(1)).call(5000L);
264302
inOrder.verify(calledOp, times(1)).call(7000L);
265-
303+
266304
subscription.unsubscribe();
267305
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
268306
inOrder.verify(calledOp, never()).call(anyLong());

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import rx.Subscription;
3434
import rx.subscriptions.BooleanSubscription;
3535
import rx.subscriptions.Subscriptions;
36+
import rx.util.functions.Action0;
3637
import rx.util.functions.Action1;
3738
import rx.util.functions.Func1;
3839
import rx.util.functions.Func2;
@@ -473,6 +474,27 @@ public Subscription onSubscribe(final Observer<? super String> observer) {
473474
fail("Error: " + observer.error.get().getMessage());
474475
}
475476
}
477+
478+
@Test
479+
public void testRecursion() {
480+
TestScheduler s = new TestScheduler();
481+
482+
final AtomicInteger counter = new AtomicInteger(0);
483+
484+
Subscription subscription = s.schedule(new Action1<Action0>() {
485+
486+
@Override
487+
public void call(Action0 self) {
488+
counter.incrementAndGet();
489+
System.out.println("counter: " + counter.get());
490+
self.call();
491+
}
492+
493+
});
494+
subscription.unsubscribe();
495+
assertEquals(0, counter.get());
496+
}
497+
476498

477499
/**
478500
* Used to determine if onNext is being invoked concurrently.

0 commit comments

Comments
 (0)