Skip to content

Commit b17b44e

Browse files
zacmostmoskovitch
andauthored
Streaming example - handle disconnects (#41)
* Check streaming timeout. Added `StreamTimeoutChecker` to check if there is a timeout with the streaming. To avoid bottleneck due to slow data flow, `TweetsQueuer` sets the tweets Json strings into the `tweetsQueue` and `TweetsListenersExecutor` creates the tweets objects. * Check for streaming tweets for reconnect errors. In case of reconnect restart the `tweetsQueuer` thread. * Check for streaming tweets for reconnect errors. In case of reconnect restart the `tweetsQueuer` thread. * Another layer * Another layer * Use abstract StreamingHandler * Use abstract StreamingHandler Co-authored-by: tmoskovitch <[email protected]>
1 parent 963141b commit b17b44e

File tree

6 files changed

+322
-101
lines changed

6 files changed

+322
-101
lines changed

examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,6 @@
2323
package com.twitter.clientlib;
2424

2525

26-
import java.util.HashSet;
27-
import java.util.Set;
28-
import java.io.InputStream;
29-
30-
import com.twitter.clientlib.ApiException;
31-
import com.twitter.clientlib.TwitterCredentialsBearer;
32-
import com.twitter.clientlib.TweetsStreamListenersExecutor;
3326
import com.twitter.clientlib.api.TwitterApi;
3427
import com.twitter.clientlib.model.*;
3528

@@ -44,22 +37,23 @@ public static void main(String[] args) {
4437
* to use the right credential object.
4538
*/
4639
TwitterApi apiInstance = new TwitterApi(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")));
47-
48-
Set<String> tweetFields = new HashSet<>();
49-
tweetFields.add("author_id");
50-
tweetFields.add("id");
51-
tweetFields.add("created_at");
52-
5340
try {
54-
InputStream streamResult = apiInstance.tweets().sampleStream()
55-
.backfillMinutes(0)
56-
.tweetFields(tweetFields)
57-
.execute();
58-
// sampleStream with TweetsStreamListenersExecutor
59-
Responder responder = new Responder();
60-
TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(streamResult);
61-
tsle.addListener(responder);
62-
tsle.executeListeners();
41+
TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor();
42+
tsle.stream()
43+
.streamingHandler(new StreamingTweetHandlerImpl(apiInstance))
44+
.executeListeners();
45+
while(tsle.getError() == null) {
46+
try {
47+
System.out.println("==> sleeping 5 ");
48+
Thread.sleep(5000);
49+
} catch (InterruptedException e) {
50+
e.printStackTrace();
51+
}
52+
}
53+
54+
if(tsle.getError() != null) {
55+
System.err.println("==> Ended with error: " + tsle.getError());
56+
}
6357

6458
// // Shutdown TweetsStreamListenersExecutor
6559
// try {
@@ -98,18 +92,3 @@ public static void main(String[] args) {
9892
}
9993
}
10094

101-
class Responder implements com.twitter.clientlib.TweetsStreamListener {
102-
@Override
103-
public void actionOnTweetsStream(StreamingTweetResponse streamingTweet) {
104-
if(streamingTweet == null) {
105-
System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
106-
return;
107-
}
108-
109-
if(streamingTweet.getErrors() != null) {
110-
streamingTweet.getErrors().forEach(System.out::println);
111-
} else if (streamingTweet.getData() != null) {
112-
System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
113-
}
114-
}
115-
}

examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java renamed to examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
package com.twitter.clientlib;
2424

25-
import com.twitter.clientlib.model.StreamingTweetResponse;
26-
27-
public interface TweetsStreamListener {
28-
void actionOnTweetsStream(StreamingTweetResponse streamingTweet);
25+
public interface ITweetsStreamListener<T> {
26+
void actionOnStreamingObject(T streamingTweet) throws ApiException;
2927
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright 2020 Twitter, Inc.
3+
SPDX-License-Identifier: Apache-2.0
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
17+
NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
18+
https://openapi-generator.tech
19+
Do not edit the class manually.
20+
*/
21+
22+
23+
package com.twitter.clientlib;
24+
25+
import java.io.InputStream;
26+
27+
import com.twitter.clientlib.api.TwitterApi;
28+
29+
public abstract class StreamingHandler<T> {
30+
protected final TwitterApi apiInstance;
31+
32+
public StreamingHandler(TwitterApi apiInstance) {
33+
this.apiInstance = apiInstance;
34+
}
35+
36+
public abstract InputStream connectStream() throws ApiException;
37+
public abstract void actionOnStreamingObject(T streamingTweet) throws ApiException;
38+
public abstract T getStreamingObject(String tweetString) throws Exception;
39+
public abstract boolean hasReconnectErrors(T streamingTweet);
40+
41+
public boolean processAndVerifyStreamingObject(String tweetString) throws Exception {
42+
T tweet = getStreamingObject(tweetString);
43+
actionOnStreamingObject(tweet);
44+
return !hasReconnectErrors(tweet);
45+
}
46+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2020 Twitter, Inc.
3+
SPDX-License-Identifier: Apache-2.0
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
17+
NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
18+
https://openapi-generator.tech
19+
Do not edit the class manually.
20+
*/
21+
22+
23+
package com.twitter.clientlib;
24+
25+
26+
import com.twitter.clientlib.api.TwitterApi;
27+
import com.twitter.clientlib.model.*;
28+
29+
public abstract class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> {
30+
public StreamingTweetHandler(TwitterApi apiInstance) {
31+
super(apiInstance);
32+
}
33+
34+
@Override
35+
public StreamingTweetResponse getStreamingObject(String tweetString) throws Exception {
36+
return StreamingTweetResponse.fromJson(tweetString);
37+
}
38+
39+
@Override
40+
public boolean hasReconnectErrors(StreamingTweetResponse streamingTweet) {
41+
boolean needToReconnect = false;
42+
if (streamingTweet.getErrors() != null) {
43+
for (Problem problem : streamingTweet.getErrors()) {
44+
if (problem instanceof OperationalDisconnectProblem || problem instanceof ConnectionExceptionProblem) {
45+
System.err.println("Re-connecting to the stream due to: " + problem);
46+
needToReconnect = true;
47+
break;
48+
}
49+
}
50+
}
51+
return needToReconnect;
52+
}
53+
}
54+
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2020 Twitter, Inc.
3+
SPDX-License-Identifier: Apache-2.0
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
17+
NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
18+
https://openapi-generator.tech
19+
Do not edit the class manually.
20+
*/
21+
22+
23+
package com.twitter.clientlib;
24+
25+
import java.io.InputStream;
26+
import java.util.HashSet;
27+
import java.util.Set;
28+
29+
import com.twitter.clientlib.api.TwitterApi;
30+
import com.twitter.clientlib.model.StreamingTweetResponse;
31+
32+
public class StreamingTweetHandlerImpl extends StreamingTweetHandler {
33+
public StreamingTweetHandlerImpl(TwitterApi apiInstance) {
34+
super(apiInstance);
35+
}
36+
37+
@Override
38+
public InputStream connectStream() throws ApiException {
39+
Set<String> tweetFields = new HashSet<>();
40+
tweetFields.add("author_id");
41+
tweetFields.add("id");
42+
tweetFields.add("created_at");
43+
tweetFields.add("geo");
44+
Set<String> expansions = new HashSet<>();
45+
expansions.add("geo.place_id");
46+
Set<String> placeFields = new HashSet<>();
47+
placeFields.add("geo");
48+
placeFields.add("id");
49+
placeFields.add("name");
50+
placeFields.add("place_type");
51+
52+
return this.apiInstance.tweets().sampleStream()
53+
.backfillMinutes(0)
54+
.tweetFields(tweetFields).expansions(expansions).placeFields(placeFields)
55+
.execute();
56+
}
57+
58+
@Override
59+
public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException {
60+
if(streamingTweet == null) {
61+
System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
62+
return;
63+
}
64+
65+
if(streamingTweet.getErrors() != null) {
66+
streamingTweet.getErrors().forEach(System.out::println);
67+
} else if (streamingTweet.getData() != null) {
68+
System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)