Skip to content

Commit 79489c2

Browse files
committed
Rework when consumer thread is started
1 parent 730edda commit 79489c2

File tree

2 files changed

+66
-16
lines changed

2 files changed

+66
-16
lines changed

src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,8 @@ public void open(
8484
// Create funnel instance.
8585
this.funnel = new MemoryFunnel(config, spoutConfig, topologyContext);
8686

87-
// Create consumer and client
88-
final int taskIndex = topologyContext.getThisTaskIndex();
89-
final Client client = new ClientFactory().createClient(config, taskIndex);
90-
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
91-
92-
// Create background consuming thread.
93-
consumerThread = new Thread(
94-
consumer,
95-
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
96-
);
87+
// Create and start consumer thread.
88+
createAndStartConsumerThread();
9789
}
9890

9991
@Override
@@ -104,12 +96,15 @@ public void close() {
10496

10597
@Override
10698
public void activate() {
107-
if (consumerThread.isAlive()) {
99+
// If the thread is already running and alive
100+
if (consumerThread != null && consumerThread.isAlive()) {
108101
// No-op. It's already running, and deactivate() is a no-op for us.
109102
return;
110103
}
111-
// Start thread, this should return immediately, but start a background processing thread.
112-
consumerThread.start();
104+
105+
// If we haven't created the consumer thread yet, or it has previously died.
106+
// Create and start it
107+
createAndStartConsumerThread();
113108
}
114109

115110
@Override
@@ -187,4 +182,21 @@ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
187182
public Map<String, Object> getComponentConfiguration() {
188183
return new HashMap<>();
189184
}
185+
186+
/**
187+
* Create background consumer thread.
188+
*/
189+
private void createAndStartConsumerThread() {
190+
// Create consumer and client
191+
final int taskIndex = topologyContext.getThisTaskIndex();
192+
final Client client = new ClientFactory().createClient(config, taskIndex);
193+
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
194+
195+
// Create background consuming thread.
196+
consumerThread = new Thread(
197+
consumer,
198+
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
199+
);
200+
consumerThread.start();
201+
}
190202
}

src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ void smokeTest_openAndClose(final ClientType clientType) {
119119

120120
// Open spout
121121
spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
122+
spout.activate();
123+
spout.deactivate();
122124

123125
// Close spout via autocloseable
124126
}
@@ -158,14 +160,49 @@ void smokeTest_openActivateDeactivateAndClose(final ClientType clientType) throw
158160
verify(mockTopologyContext, times(1)).getThisTaskIndex();
159161
}
160162

163+
/**
164+
* Basic lifecycle smoke test.
165+
* Cycle calling activate/deactivate a few times.
166+
*/
167+
@ParameterizedTest
168+
@EnumSource(ClientType.class)
169+
void smokeTest_cycleActivateDeactivate(final ClientType clientType) throws InterruptedException {
170+
// Inject client type into config
171+
configBuilder.withClientType(clientType);
172+
173+
// Create spout
174+
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
175+
final StubSpoutCollector collector = new StubSpoutCollector();
176+
177+
// Open spout
178+
spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
179+
180+
// Cycle Activate and Deactivate a few times
181+
spout.activate();
182+
Thread.sleep(2000L);
183+
spout.deactivate();
184+
185+
spout.activate();
186+
Thread.sleep(2000L);
187+
spout.deactivate();
188+
189+
spout.activate();
190+
Thread.sleep(2000L);
191+
spout.deactivate();
192+
193+
// Close via Autocloseable.
194+
}
195+
196+
// Verify mocks
197+
verify(mockTopologyContext, times(1)).getThisTaskIndex();
198+
}
199+
161200
/**
162201
* Verifies the behavior when you attempt to connect to a redis instance
163202
* that does not exist. Looks like nothing. You get errors in the logs.
164203
*
165204
* Disabled for now.
166205
*/
167-
// @ParameterizedTest
168-
// @EnumSource(ClientType.class)
169206
void smokeTest_configureInvalidRedisHost(final ClientType clientType) throws InterruptedException {
170207
// Inject client type into config
171208
configBuilder.withClientType(clientType);
@@ -199,7 +236,7 @@ void smokeTest_configureInvalidRedisHost(final ClientType clientType) throws Int
199236
}
200237

201238
// Verify mocks
202-
verify(mockTopologyContext, times(1)).getThisTaskIndex();
239+
verify(mockTopologyContext, times(2)).getThisTaskIndex();
203240
}
204241

205242
/**
@@ -516,6 +553,7 @@ void test_EmitDownSeparateStreams(final ClientType clientType) {
516553

517554
// Open spout
518555
spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
556+
spout.activate();
519557

520558
// Ask for stream names
521559
final OutputFieldsGetter getter = new OutputFieldsGetter();

0 commit comments

Comments
 (0)