Skip to content

Commit c27392c

Browse files
committed
cleanup
1 parent 5d80328 commit c27392c

File tree

10 files changed

+484
-291
lines changed

10 files changed

+484
-291
lines changed

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
/**
2222
* Redis Stream based Spout for Apache Storm 2.2.x.
2323
*/
24-
public class RedisStreamSpout implements IRichSpout {
24+
public class RedisStreamSpout implements IRichSpout, AutoCloseable {
2525
private static final Logger logger = LoggerFactory.getLogger(RedisStreamSpout.class);
2626

2727
/**

src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class LettuceClient implements Client {
3737
/**
3838
* The underlying Redis Client.
3939
*/
40-
private final LettuceAdapter client;
40+
private final LettuceAdapter adapter;
4141

4242
/**
4343
* Re-usable instance to prevent unnecessary garbage creation.
@@ -58,20 +58,20 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
5858
// Determine which adapter to use based on what type of redis instance we are
5959
// communicating with.
6060
config.isConnectingToCluster()
61-
? new LettuceClusterClient(RedisClusterClient.create(config.getConnectString()))
62-
: new LettuceRedisClient(RedisClient.create(config.getConnectString()))
61+
? new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString()))
62+
: new LettuceRedisAdapter(RedisClient.create(config.getConnectString()))
6363
);
6464
}
6565

6666
/**
6767
* Protected constructor for injecting a RedisClient instance, typically for tests.
6868
* @param config Configuration.
6969
* @param instanceId Which instance number is this running under.
70-
* @param client RedisClient instance.
70+
* @param adapter RedisClient instance.
7171
*/
72-
LettuceClient(final RedisStreamSpoutConfig config, final int instanceId, final LettuceAdapter client) {
72+
LettuceClient(final RedisStreamSpoutConfig config, final int instanceId, final LettuceAdapter adapter) {
7373
this.config = Objects.requireNonNull(config);
74-
this.client = Objects.requireNonNull(client);
74+
this.adapter = Objects.requireNonNull(adapter);
7575

7676
// Calculate consumerId
7777
this.consumerId = config.getConsumerIdPrefix() + instanceId;
@@ -92,15 +92,15 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
9292

9393
@Override
9494
public void connect() {
95-
if (client.isConnected()) {
95+
if (adapter.isConnected()) {
9696
throw new IllegalStateException("Cannot call connect more than once!");
9797
}
9898

99-
client.connect();
99+
adapter.connect();
100100

101101
try {
102102
// Attempt to create consumer group
103-
client.getSyncCommands().xgroupCreate(
103+
adapter.getSyncCommands().xgroupCreate(
104104
// Start the group at first offset for our key.
105105
XReadArgs.StreamOffset.from(config.getStreamKey(), "0-0"),
106106
// Define the group name
@@ -128,7 +128,7 @@ public void connect() {
128128
@Override
129129
public List<Message> nextMessages() {
130130
// Get next batch of messages.
131-
final List<StreamMessage<String, String>> messages = client.getSyncCommands().xreadgroup(
131+
final List<StreamMessage<String, String>> messages = adapter.getSyncCommands().xreadgroup(
132132
consumerFrom,
133133
xreadArgs,
134134
lastConsumed
@@ -144,7 +144,7 @@ public List<Message> nextMessages() {
144144
@Override
145145
public void commitMessage(final String msgId) {
146146
// Confirm that the message has been processed using XACK
147-
client.getSyncCommands().xack(
147+
adapter.getSyncCommands().xack(
148148
config.getStreamKey(),
149149
config.getGroupName(),
150150
msgId
@@ -153,6 +153,6 @@ public void commitMessage(final String msgId) {
153153

154154
@Override
155155
public void disconnect() {
156-
client.shutdown();
156+
adapter.shutdown();
157157
}
158158
}

src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterClient.java renamed to src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
/**
1010
* Adapter for talking to a RedisCluster.
11-
* If you need to talk to a single Redis instance {@link LettuceRedisClient}.
11+
* If you need to talk to a single Redis instance {@link LettuceRedisAdapter}.
1212
*/
13-
public class LettuceClusterClient implements LettuceAdapter {
13+
public class LettuceClusterAdapter implements LettuceAdapter {
1414
/**
1515
* The underlying Redis Client.
1616
*/
@@ -22,7 +22,7 @@ public class LettuceClusterClient implements LettuceAdapter {
2222
private StatefulRedisClusterConnection<String, String> connection;
2323
private RedisStreamCommands<String, String> syncCommands;
2424

25-
public LettuceClusterClient(final RedisClusterClient redisClient) {
25+
public LettuceClusterAdapter(final RedisClusterClient redisClient) {
2626
this.redisClient = Objects.requireNonNull(redisClient);
2727
}
2828

@@ -51,6 +51,7 @@ public RedisStreamCommands<String, String> getSyncCommands() {
5151
public void shutdown() {
5252
// Close our connection and shutdown.
5353
if (connection != null) {
54+
syncCommands = null;
5455
connection.close();
5556
connection = null;
5657
}

src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisClient.java renamed to src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
/**
1010
* Adapter for talking to a single Redis instance.
11-
* If you need to talk to a RedisCluster {@link LettuceClusterClient}.
11+
* If you need to talk to a RedisCluster {@link LettuceClusterAdapter}.
1212
*/
13-
public class LettuceRedisClient implements LettuceAdapter {
13+
public class LettuceRedisAdapter implements LettuceAdapter {
1414

1515
/**
1616
* The underlying Redis Client.
@@ -23,7 +23,7 @@ public class LettuceRedisClient implements LettuceAdapter {
2323
private StatefulRedisConnection<String, String> connection;
2424
private RedisStreamCommands<String, String> syncCommands;
2525

26-
public LettuceRedisClient(final RedisClient redisClient) {
26+
public LettuceRedisAdapter(final RedisClient redisClient) {
2727
this.redisClient = Objects.requireNonNull(redisClient);
2828
}
2929

@@ -52,6 +52,7 @@ public RedisStreamCommands<String, String> getSyncCommands() {
5252
public void shutdown() {
5353
// Close our connection and shutdown.
5454
if (connection != null) {
55+
syncCommands = null;
5556
connection.close();
5657
connection = null;
5758
}

0 commit comments

Comments
 (0)