Skip to content

Commit fab26fb

Browse files
committed
cleanup
1 parent 1f157e2 commit fab26fb

File tree

5 files changed

+32
-17
lines changed

5 files changed

+32
-17
lines changed

src/main/java/org/sourcelab/storm/spout/redis/failhandler/ExponentialBackoffConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ public static Builder newBuilder() {
6363
return new Builder();
6464
}
6565

66+
/**
67+
* Create a new default configuration.
68+
* @return Default configuration.
69+
*/
70+
public static ExponentialBackoffConfig defaultConfig() {
71+
return new Builder().build();
72+
}
73+
6674
public int getRetryLimit() {
6775
return retryLimit;
6876
}

src/main/java/org/sourcelab/storm/spout/redis/failhandler/ExponentialBackoffFailureHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class ExponentialBackoffFailureHandler implements FailureHandler {
2828
* Used to control timing around retries.
2929
* Also allows for injecting a mock clock for testing.
3030
*/
31-
private transient Clock clock = Clock.systemUTC();
31+
private transient Clock clock;
3232

3333
/**
3434
* This map hows how many times each messageId has failed.
@@ -253,10 +253,10 @@ public MetricHandler() {
253253
* @param retryQueue Our internal retry queue to collect size data.
254254
*/
255255
public MetricHandler(final TopologyContext topologyContext, final TreeMap<Long, Queue<Message>> retryQueue) {
256-
metricExceededRetryLimitCount = topologyContext.registerCounter("failureHandler.exceededRetryLimit");
257-
metricRetriedMessagesCount = topologyContext.registerCounter("failureHandler.retriedMessages");
258-
metricSuccessfulRetriedMessagesCounter = topologyContext.registerCounter("failureHandler.successfulRetriedMessages");
259-
topologyContext.registerGauge("failureHandler.queuedForRetry", () ->
256+
metricExceededRetryLimitCount = topologyContext.registerCounter("failureHandler_exceededRetryLimit");
257+
metricRetriedMessagesCount = topologyContext.registerCounter("failureHandler_retriedMessages");
258+
metricSuccessfulRetriedMessagesCounter = topologyContext.registerCounter("failureHandler_successfulRetriedMessages");
259+
topologyContext.registerGauge("failureHandler_queuedForRetry", () ->
260260
retryQueue.values().stream()
261261
.mapToLong(Collection::size)
262262
.sum()

src/main/java/org/sourcelab/storm/spout/redis/failhandler/RetryFailedTuples.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@ public class RetryFailedTuples implements FailureHandler, Serializable {
2727
*/
2828
private final Map<String, Long> messageCounter = new HashMap<>();
2929

30-
/**
31-
* Contains a FIFO queue for failed messages.
32-
*/
33-
private LinkedBlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
34-
3530
/**
3631
* How many times a failed message should be replayed.
3732
* A value of 0 means never give up on a message and always replay it.
3833
*/
3934
private final int maxRetries;
4035

36+
/**
37+
* Contains a FIFO queue for failed messages.
38+
*/
39+
private transient LinkedBlockingQueue<Message> messageQueue;
40+
4141
/**
4242
* Constructor.
4343
* @param maxRetries Maximum number of times to retry a failed tuple.
@@ -48,7 +48,8 @@ public RetryFailedTuples(final int maxRetries) {
4848

4949
@Override
5050
public void open(final Map<String, Object> stormConfig, final TopologyContext topologyContext) {
51-
// no-op
51+
// Create queue
52+
messageQueue = new LinkedBlockingQueue<>();
5253
}
5354

5455
@Override

src/test/java/org/sourcelab/storm/spout/redis/example/ExampleLocalTopology.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import org.apache.storm.Config;
44
import org.apache.storm.LocalCluster;
55
import org.apache.storm.generated.StormTopology;
6+
import org.apache.storm.metric.LoggingMetricsConsumer;
67
import org.apache.storm.topology.TopologyBuilder;
78
import org.sourcelab.storm.spout.redis.RedisStreamSpout;
89
import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
9-
import org.sourcelab.storm.spout.redis.failhandler.RetryFailedTuples;
10+
import org.sourcelab.storm.spout.redis.failhandler.ExponentialBackoffConfig;
11+
import org.sourcelab.storm.spout.redis.failhandler.ExponentialBackoffFailureHandler;
1012
import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper;
1113
import org.testcontainers.containers.GenericContainer;
1214

@@ -70,7 +72,7 @@ public void runExample(final boolean enableDebug) throws Exception {
7072
.withConsumerIdPrefix(consumerPrefix)
7173
.withStreamKey(streamKey)
7274
// Failure Handler
73-
.withFailureHandler(new RetryFailedTuples(1))
75+
.withFailureHandler(new ExponentialBackoffFailureHandler(ExponentialBackoffConfig.defaultConfig()))
7476
// Tuple Handler Class
7577
.withTupleConverter(new TestTupleConverter("value"));
7678

@@ -83,6 +85,10 @@ public void runExample(final boolean enableDebug) throws Exception {
8385
// Create configuration
8486
final Config config = new Config();
8587
config.setDebug(enableDebug);
88+
// Never fall back to java serialization
89+
config.setFallBackOnJavaSerialization(false);
90+
// Logging metrics consumer for metrics validation.
91+
config.registerMetricsConsumer(LoggingMetricsConsumer.class);
8692

8793
// Submits the topology to the local cluster
8894
localCluster.submitTopology(

src/test/java/org/sourcelab/storm/spout/redis/failhandler/ExponentialBackoffFailureHandlerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -626,12 +626,12 @@ private void validateTupleIsNotBeingTracked(final ExponentialBackoffFailureHandl
626626

627627
private void verifyMetricInteractions() {
628628
verify(mockTopologyContext, times(1))
629-
.registerCounter(eq("failureHandler.exceededRetryLimit"));
629+
.registerCounter(eq("failureHandler_exceededRetryLimit"));
630630
verify(mockTopologyContext, times(1))
631-
.registerCounter(eq("failureHandler.retriedMessages"));
631+
.registerCounter(eq("failureHandler_retriedMessages"));
632632
verify(mockTopologyContext, times(1))
633-
.registerCounter(eq("failureHandler.successfulRetriedMessages"));
633+
.registerCounter(eq("failureHandler_successfulRetriedMessages"));
634634
verify(mockTopologyContext, times(1))
635-
.registerGauge(eq("failureHandler.queuedForRetry"), any(Gauge.class));
635+
.registerGauge(eq("failureHandler_queuedForRetry"), any(Gauge.class));
636636
}
637637
}

0 commit comments

Comments
 (0)