21
21
22
22
23
23
import java .io .IOException ;
24
+ import java .time .LocalDateTime ;
25
+ import java .time .temporal .ChronoUnit ;
26
+ import java .time .temporal .TemporalUnit ;
24
27
import java .util .ArrayList ;
25
28
import java .util .List ;
26
29
import java .util .concurrent .BlockingQueue ;
@@ -43,7 +46,7 @@ public class TweetsStreamExecutor {
43
46
44
47
private static final Logger logger = LoggerFactory .getLogger (TweetsStreamExecutor .class );
45
48
46
- private static final long EMPTY_STREAM_TIMEOUT = 20000 ;
49
+ private static final long EMPTY_STREAM_TIMEOUT = 20 ;
47
50
private static final int POLL_WAIT = 5 ;
48
51
49
52
private volatile BlockingQueue <String > rawTweets ;
@@ -132,17 +135,15 @@ public void queueTweets() {
132
135
String line = null ;
133
136
try {
134
137
boolean emptyResponse = false ;
135
- long firstEmptyResponseMillis = 0 ;
136
- long lastEmptyReponseMillis ;
138
+ LocalDateTime firstEmpty = LocalDateTime .now ();
137
139
while (isRunning ) {
138
140
line = stream .readUtf8Line ();
139
141
if (line == null || line .isEmpty ()) {
140
142
if (!emptyResponse ) {
141
- firstEmptyResponseMillis = System . currentTimeMillis ();
143
+ firstEmpty = LocalDateTime . now ();
142
144
emptyResponse = true ;
143
145
} else {
144
- lastEmptyReponseMillis = System .currentTimeMillis ();
145
- if (lastEmptyReponseMillis - firstEmptyResponseMillis > EMPTY_STREAM_TIMEOUT ) {
146
+ if (LocalDateTime .now ().minus (EMPTY_STREAM_TIMEOUT , ChronoUnit .SECONDS ).isAfter (firstEmpty )) {
146
147
throw new EmptyStreamTimeoutException (String .format ("Stream was empty for %d seconds consecutively" , EMPTY_STREAM_TIMEOUT ));
147
148
}
148
149
}
0 commit comments