1
1
/**
2
2
* Copyright 2014 Netflix, Inc.
3
- *
3
+ *
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
6
6
* You may obtain a copy of the License at
7
- *
7
+ *
8
8
* http://www.apache.org/licenses/LICENSE-2.0
9
- *
9
+ *
10
10
* Unless required by applicable law or agreed to in writing, software
11
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -45,8 +45,8 @@ public abstract class Scheduler {
45
45
* maintenance.
46
46
*/
47
47
48
- /**
49
- * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
48
+ /**
49
+ * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
50
50
* <p>
51
51
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
52
52
*/
@@ -55,14 +55,14 @@ public abstract class Scheduler {
55
55
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit .MINUTES .toNanos (
56
56
Long .getLong ("rx.scheduler.drift-tolerance" , 15 ));
57
57
}
58
-
58
+
59
59
/**
60
60
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
61
61
* <p>
62
62
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
63
63
* <p>
64
64
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
65
- *
65
+ *
66
66
* @return a Worker representing a serial queue of actions to be executed
67
67
*/
68
68
public abstract Worker createWorker ();
@@ -76,7 +76,7 @@ public abstract static class Worker implements Subscription {
76
76
77
77
/**
78
78
* Schedules an Action for execution.
79
- *
79
+ *
80
80
* @param action
81
81
* Action to schedule
82
82
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
@@ -107,7 +107,7 @@ public abstract static class Worker implements Subscription {
107
107
* <p>
108
108
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
109
109
* undelayed scheduling of the first and any subsequent executions.
110
- *
110
+ *
111
111
* @param action
112
112
* the Action to execute periodically
113
113
* @param initialDelay
@@ -127,7 +127,7 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
127
127
128
128
final SequentialSubscription first = new SequentialSubscription ();
129
129
final SequentialSubscription mas = new SequentialSubscription (first );
130
-
130
+
131
131
final Action0 recursiveAction = new Action0 () {
132
132
long count ;
133
133
long lastNowNanos = firstNowNanos ;
@@ -137,9 +137,9 @@ public void call() {
137
137
action .call ();
138
138
139
139
if (!mas .isUnsubscribed ()) {
140
-
140
+
141
141
long nextTick ;
142
-
142
+
143
143
long nowNanos = TimeUnit .MILLISECONDS .toNanos (now ());
144
144
// If the clock moved in a direction quite a bit, rebase the repetition period
145
145
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
@@ -154,7 +154,7 @@ public void call() {
154
154
nextTick = startInNanos + (++count * periodInNanos );
155
155
}
156
156
lastNowNanos = nowNanos ;
157
-
157
+
158
158
long delay = nextTick - nowNanos ;
159
159
mas .replace (schedule (this , delay , TimeUnit .NANOSECONDS ));
160
160
}
@@ -183,82 +183,82 @@ public long now() {
183
183
return System .currentTimeMillis ();
184
184
}
185
185
186
- /**
187
- * Allows the use of operators for controlling the timing around when
188
- * actions scheduled on workers are actually done. This makes it possible to
189
- * layer additional behavior on this {@link Scheduler}. The only parameter
190
- * is a function that flattens an {@link Observable} of {@link Observable}
191
- * of {@link Completable}s into just one {@link Completable}. There must be
192
- * a chain of operators connecting the returned value to the source
193
- * {@link Observable} otherwise any work scheduled on the returned
194
- * {@link Scheduler} will not be executed.
195
- * <p>
196
- * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
197
- * {@link Completable}s is onNext'd to the combinator to be flattened. If
198
- * the inner {@link Observable} is not immediately subscribed to an calls to
199
- * {@link Worker#schedule} are buffered. Once the {@link Observable} is
200
- * subscribed to actions are then onNext'd as {@link Completable}s.
201
- * <p>
202
- * Finally the actions scheduled on the parent {@link Scheduler} when the
203
- * inner most {@link Completable}s are subscribed to.
204
- * <p>
205
- * When the {@link Worker} is unsubscribed the {@link Completable} emits an
206
- * onComplete and triggers any behavior in the flattening operator. The
207
- * {@link Observable} and all {@link Completable}s give to the flattening
208
- * function never onError.
209
- * <p>
210
- * Limit the amount concurrency two at a time without creating a new fix
211
- * size thread pool:
212
- *
213
- * <pre>
214
- * Scheduler limitSched = Schedulers.computation().when(workers -> {
215
- * // use merge max concurrent to limit the number of concurrent
216
- * // callbacks two at a time
217
- * return Completable.merge(Observable.merge(workers), 2);
218
- * });
219
- * </pre>
220
- * <p>
221
- * This is a slightly different way to limit the concurrency but it has some
222
- * interesting benefits and drawbacks to the method above. It works by
223
- * limited the number of concurrent {@link Worker}s rather than individual
224
- * actions. Generally each {@link Observable} uses its own {@link Worker}.
225
- * This means that this will essentially limit the number of concurrent
226
- * subscribes. The danger comes from using operators like
227
- * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
228
- * subscribing to the first {@link Observable} could deadlock the
229
- * subscription to the second.
230
- *
231
- * <pre>
232
- * Scheduler limitSched = Schedulers.computation().when(workers -> {
233
- * // use merge max concurrent to limit the number of concurrent
234
- * // Observables two at a time
235
- * return Completable.merge(Observable.merge(workers, 2));
236
- * });
237
- * </pre>
238
- *
239
- * Slowing down the rate to no more than than 1 a second. This suffers from
240
- * the same problem as the one above I could find an {@link Observable}
241
- * operator that limits the rate without dropping the values (aka leaky
242
- * bucket algorithm).
243
- *
244
- * <pre>
245
- * Scheduler slowSched = Schedulers.computation().when(workers -> {
246
- * // use concatenate to make each worker happen one at a time.
247
- * return Completable.concat(workers.map(actions -> {
248
- * // delay the starting of the next worker by 1 second.
249
- * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
250
- * }));
251
- * });
252
- * </pre>
253
- *
254
- * @param <S> a Scheduler and a Subscription
255
- * @param combine the function that takes a two-level nested Observable sequence of a Completable and returns
256
- * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
257
- * @return the Scheduler with the customized execution behavior
258
- */
186
+ /**
187
+ * Allows the use of operators for controlling the timing around when
188
+ * actions scheduled on workers are actually done. This makes it possible to
189
+ * layer additional behavior on this {@link Scheduler}. The only parameter
190
+ * is a function that flattens an {@link Observable} of {@link Observable}
191
+ * of {@link Completable}s into just one {@link Completable}. There must be
192
+ * a chain of operators connecting the returned value to the source
193
+ * {@link Observable} otherwise any work scheduled on the returned
194
+ * {@link Scheduler} will not be executed.
195
+ * <p>
196
+ * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
197
+ * {@link Completable}s is onNext'd to the combinator to be flattened. If
198
+ * the inner {@link Observable} is not immediately subscribed to an calls to
199
+ * {@link Worker#schedule} are buffered. Once the {@link Observable} is
200
+ * subscribed to actions are then onNext'd as {@link Completable}s.
201
+ * <p>
202
+ * Finally the actions scheduled on the parent {@link Scheduler} when the
203
+ * inner most {@link Completable}s are subscribed to.
204
+ * <p>
205
+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an
206
+ * onComplete and triggers any behavior in the flattening operator. The
207
+ * {@link Observable} and all {@link Completable}s give to the flattening
208
+ * function never onError.
209
+ * <p>
210
+ * Limit the amount concurrency two at a time without creating a new fix
211
+ * size thread pool:
212
+ *
213
+ * <pre>
214
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
215
+ * // use merge max concurrent to limit the number of concurrent
216
+ * // callbacks two at a time
217
+ * return Completable.merge(Observable.merge(workers), 2);
218
+ * });
219
+ * </pre>
220
+ * <p>
221
+ * This is a slightly different way to limit the concurrency but it has some
222
+ * interesting benefits and drawbacks to the method above. It works by
223
+ * limited the number of concurrent {@link Worker}s rather than individual
224
+ * actions. Generally each {@link Observable} uses its own {@link Worker}.
225
+ * This means that this will essentially limit the number of concurrent
226
+ * subscribes. The danger comes from using operators like
227
+ * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
228
+ * subscribing to the first {@link Observable} could deadlock the
229
+ * subscription to the second.
230
+ *
231
+ * <pre>
232
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
233
+ * // use merge max concurrent to limit the number of concurrent
234
+ * // Observables two at a time
235
+ * return Completable.merge(Observable.merge(workers, 2));
236
+ * });
237
+ * </pre>
238
+ *
239
+ * Slowing down the rate to no more than than 1 a second. This suffers from
240
+ * the same problem as the one above I could find an {@link Observable}
241
+ * operator that limits the rate without dropping the values (aka leaky
242
+ * bucket algorithm).
243
+ *
244
+ * <pre>
245
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
246
+ * // use concatenate to make each worker happen one at a time.
247
+ * return Completable.concat(workers.map(actions -> {
248
+ * // delay the starting of the next worker by 1 second.
249
+ * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
250
+ * }));
251
+ * });
252
+ * </pre>
253
+ *
254
+ * @param <S> a Scheduler and a Subscription
255
+ * @param combine the function that takes a two-level nested Observable sequence of a Completable and returns
256
+ * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
257
+ * @return the Scheduler with the customized execution behavior
258
+ */
259
259
@ SuppressWarnings ("unchecked" )
260
260
@ Experimental
261
- public <S extends Scheduler & Subscription > S when (Func1 <Observable <Observable <Completable >>, Completable > combine ) {
262
- return (S ) new SchedulerWhen (combine , this );
263
- }
261
+ public <S extends Scheduler & Subscription > S when (Func1 <Observable <Observable <Completable >>, Completable > combine ) {
262
+ return (S ) new SchedulerWhen (combine , this );
263
+ }
264
264
}
0 commit comments