Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e8b6035

Browse files
authoredFeb 22, 2017
1.x: throttleFirst detecting clock-drift backwards to open the gate (#5123)
1 parent bc06175 commit e8b6035

File tree

2 files changed

+29
-13
lines changed

2 files changed

+29
-13
lines changed
 

‎src/main/java/rx/internal/operators/OperatorThrottleFirst.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void onStart() {
4848
@Override
4949
public void onNext(T v) {
5050
long now = scheduler.now();
51-
if (lastOnNext == -1 || now - lastOnNext >= timeInMilliseconds) {
51+
if (lastOnNext == -1 || now < lastOnNext || now - lastOnNext >= timeInMilliseconds) {
5252
lastOnNext = now;
5353
subscriber.onNext(v);
5454
}

‎src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,18 @@
1616
package rx.internal.operators;
1717

1818
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.times;
22-
import static org.mockito.Mockito.verify;
23-
import static org.mockito.Mockito.verifyNoMoreInteractions;
19+
import static org.mockito.Mockito.*;
2420

2521
import java.util.concurrent.TimeUnit;
2622

27-
import org.junit.Before;
28-
import org.junit.Test;
23+
import org.junit.*;
2924
import org.mockito.InOrder;
3025

31-
import rx.Observable;
26+
import rx.*;
3227
import rx.Observable.OnSubscribe;
33-
import rx.Observer;
34-
import rx.Scheduler;
35-
import rx.Subscriber;
3628
import rx.exceptions.TestException;
3729
import rx.functions.Action0;
38-
import rx.observers.TestSubscriber;
30+
import rx.observers.*;
3931
import rx.schedulers.TestScheduler;
4032
import rx.subjects.PublishSubject;
4133

@@ -214,4 +206,28 @@ public void throttleWithTestSchedulerTimeOfZero() {
214206
verify(observer).onCompleted();
215207
verifyNoMoreInteractions(observer);
216208
}
209+
210+
@Test
211+
public void nowDrift() {
212+
TestScheduler s = new TestScheduler();
213+
s.advanceTimeBy(2, TimeUnit.SECONDS);
214+
215+
PublishSubject<Integer> o = PublishSubject.create();
216+
217+
AssertableSubscriber<Integer> as = o.throttleFirst(500, TimeUnit.MILLISECONDS, s)
218+
.test();
219+
220+
o.onNext(1);
221+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
222+
o.onNext(2);
223+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
224+
o.onNext(3);
225+
s.advanceTimeBy(-1000, TimeUnit.MILLISECONDS);
226+
o.onNext(4);
227+
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
228+
o.onNext(5);
229+
o.onCompleted();
230+
231+
as.assertResult(1, 4);
232+
}
217233
}

0 commit comments

Comments
 (0)
Please sign in to comment.