2020import org .sourcelab .storm .spout .redis .util .test .RedisTestHelper ;
2121import org .sourcelab .storm .spout .redis .util .test .StreamConsumerInfo ;
2222
23+ import java .time .Duration ;
2324import java .util .ArrayList ;
2425import java .util .Arrays ;
2526import java .util .Collections ;
2627import java .util .List ;
2728import java .util .Map ;
2829import java .util .concurrent .TimeUnit ;
2930
31+ import static org .awaitility .Awaitility .await ;
3032import static org .junit .jupiter .api .Assertions .assertEquals ;
3133import static org .junit .jupiter .api .Assertions .assertNotNull ;
3234import static org .junit .jupiter .api .Assertions .assertTrue ;
@@ -127,6 +129,7 @@ void smokeTest_openAndClose(final ClientType clientType) {
127129
128130 /**
129131 * Basic lifecycle smoke test.
132+ * Basically validating that nothing explodes.
130133 */
131134 @ ParameterizedTest
132135 @ EnumSource (ClientType .class )
@@ -147,7 +150,7 @@ void smokeTest_openActivateDeactivateAndClose(final ClientType clientType) throw
147150 // Small sleep
148151 Thread .sleep (3000L );
149152
150- // Deactivate and close via Autoclosable
153+ // Deactivate and close via Autocloseable
151154 spout .deactivate ();
152155 }
153156
@@ -161,8 +164,8 @@ void smokeTest_openActivateDeactivateAndClose(final ClientType clientType) throw
161164 *
162165 * Disabled for now.
163166 */
164- @ ParameterizedTest
165- @ EnumSource (ClientType .class )
167+ // @ParameterizedTest
168+ // @EnumSource(ClientType.class)
166169 void smokeTest_configureInvalidRedisHost (final ClientType clientType ) throws InterruptedException {
167170 // Inject client type into config
168171 configBuilder .withClientType (clientType );
@@ -221,21 +224,23 @@ void smokeTest_consumeAndAckMessages(final ClientType clientType) throws Interru
221224 // Lets publish 10 messages to the stream
222225 final List <String > producedMsgIds = redisTestHelper .produceMessages (streamKey , 10 );
223226
224- // Now lets try to get those from the spout
225- do {
226- spout .nextTuple ();
227- Thread .sleep (100L );
228- } while (collector .getEmittedTuples ().size () < 10 );
227+ // Now lets try to get those from the spout.
228+ // Call spout.nextTuple() until the spout has emitted 10 tuples.
229+ await ()
230+ .atMost (Duration .ofSeconds (10 ))
231+ .until (() -> {
232+ spout .nextTuple ();
233+ return collector .getEmittedTuples ().size () == 10 ;
234+ });
229235
230- // Call next tuple a few more times, should be a no-op
236+ // Call next tuple a few more times, should be a no-op nothing further should be emitted.
231237 for (int counter = 0 ; counter < 10 ; counter ++) {
232238 Thread .sleep (100L );
233239 spout .nextTuple ();
234240 }
235241
236242 // Verify what got emitted.
237243 assertEquals (10 , collector .getEmittedTuples ().size (), "Should have found 10 emitted tuples." );
238-
239244 final String expectedStreamId = Utils .DEFAULT_STREAM_ID ;
240245 for (int index = 0 ; index < producedMsgIds .size (); index ++) {
241246 final EmittedTuple emittedTuple = collector .getEmittedTuples ().get (index );
@@ -261,27 +266,33 @@ void smokeTest_consumeAndAckMessages(final ClientType clientType) throws Interru
261266 assertTrue (foundValue , "Failed to find msgId tuple value" );
262267 }
263268
264- // See that we have 10 items pending
265- StreamConsumerInfo consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
266- assertNotNull (consumerInfo , "Failed to find consumer info!" );
269+ // See that we have 10 items pending in the stream's consumer list.
270+ await ()
271+ .atMost (Duration .ofSeconds (10 ))
272+ .until (() -> {
273+ final StreamConsumerInfo consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
274+ assertNotNull (consumerInfo , "Failed to find consumer info!" );
267275
268- // Verify we have 10 items pending
269- assertEquals (10L , consumerInfo .getPending (), "Found entries pending" );
276+ // Verify we have 10 items pending
277+ return consumerInfo .getPending () == 10 ;
278+ });
270279
271280 // Now Ack the messages
272281 collector .getEmittedTuples ().stream ()
273282 .map (EmittedTuple ::getMessageId )
274283 .forEach (spout ::ack );
275284
276- // Small delay waiting for processing.
277- Thread .sleep (1000L );
285+ // We should see the number of pending messages for our consumer drop to 0
286+ await ()
287+ .atMost (Duration .ofSeconds (10 ))
288+ .until (() -> {
289+ // Verify that our message were acked in redis.
290+ final StreamConsumerInfo consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
291+ assertNotNull (consumerInfo , "Failed to find consumer info!" );
278292
279- // Verify that our message were acked in redis.
280- consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
281- assertNotNull (consumerInfo , "Failed to find consumer info!" );
282-
283- // Verify we have nothing pending
284- assertEquals (0L , consumerInfo .getPending (), "Found entries pending?" );
293+ // Verify we have nothing pending
294+ return consumerInfo .getPending () == 0 ;
295+ });
285296
286297 // Deactivate and close via Autocloseable
287298 spout .deactivate ();
@@ -300,7 +311,7 @@ void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws Int
300311 // Inject client type into config
301312 configBuilder .withClientType (clientType );
302313
303- // Swap out failure handler
314+ // Swap out failure handler, each tuple should be retried a maximum of twice.
304315 configBuilder .withFailureHandler (new RetryFailedTuples (2 ));
305316
306317 // Create spout
@@ -317,15 +328,16 @@ void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws Int
317328 List <String > producedMsgIds = redisTestHelper .produceMessages (streamKey , 10 );
318329
319330 // Now lets try to get 5 of those those from the spout...
320- do {
321- spout .nextTuple ();
322- Thread .sleep (100L );
323- } while (collector .getEmittedTuples ().size () < 5 );
324-
331+ // Call spout.nextTuple() until we have at least 5 tuples emitted to the output collector.
332+ await ()
333+ .atMost (Duration .ofSeconds (10 ))
334+ .until (() -> {
335+ spout .nextTuple ();
336+ return collector .getEmittedTuples ().size () == 5 ;
337+ });
325338
326339 // Verify what got emitted.
327- assertEquals (5 , collector .getEmittedTuples ().size (), "Should have found 10 emitted tuples." );
328-
340+ assertEquals (5 , collector .getEmittedTuples ().size (), "Should have found 5 emitted tuples." );
329341 final String expectedStreamId = Utils .DEFAULT_STREAM_ID ;
330342 for (int index = 0 ; index < 5 ; index ++) {
331343 final EmittedTuple emittedTuple = collector .getEmittedTuples ().get (index );
@@ -351,13 +363,17 @@ void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws Int
351363 assertTrue (foundValue , "Failed to find msgId tuple value" );
352364 }
353365
354- // See that we have 10 items pending
366+ // Since we have NOT acked any tuples, we should have at least 5 tuples pending with a max of 10
367+ // depending on how many have been consumed by the spout's consuming thread.
355368 StreamConsumerInfo consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
356369 assertNotNull (consumerInfo , "Failed to find consumer info!" );
357- assertEquals (10L , consumerInfo .getPending (), "Found entries pending" );
370+ assertTrue (consumerInfo .getPending () >= 5 , "At least 5 entries pending" );
371+ assertTrue (consumerInfo .getPending () <= 10 , "No more than 10 entries pending" );
358372
373+ // We want to setup the following scenario using the 5 tuples the spout has emitted so far.
374+ // ack the first 3 messages
375+ // fail the last 2 messages.
359376 final List <String > messageIdsToFail = new ArrayList <>();
360-
361377 for (int index = 0 ; index < 5 ; index ++) {
362378 // Now ack the first 3 messages
363379 if (index < 3 ) {
@@ -377,38 +393,48 @@ void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws Int
377393 collector .reset ();
378394
379395 // Small delay waiting for processing.
380- Thread .sleep (1000L );
381-
382- // Verify that our message were acked in redis.
383- consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
384- assertNotNull (consumerInfo , "Failed to find consumer info!" );
385-
386- // Verify we have 7 pending
387- assertEquals (7L , consumerInfo .getPending (), "Found entries pending" );
388-
389- // Ask for the next two tuples, we should get our failed tuples back out.
390- do {
391- spout .nextTuple ();
392- } while (collector .getEmittedTuples ().size () < 2 );
393-
394- // We should have emitted two tuples.
396+ // Wait until we have at least 7 pending, which is our 10 total messages, minus the 3 acked ones.
397+ await ()
398+ .atMost (Duration .ofSeconds (10 ))
399+ .until (() -> {
400+ final StreamConsumerInfo consumerState = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
401+
402+ // Verify that our message were acked in redis.
403+ assertNotNull (consumerState , "Failed to find consumer info!" );
404+
405+ return consumerState .getPending () == 7 ;
406+ });
407+
408+ // Ask the spout for the next two tuples,
409+ // The expectation here is that we should get our failed tuples back out.
410+ await ()
411+ .atMost (Duration .ofSeconds (10 ))
412+ .until (() -> {
413+ spout .nextTuple ();
414+ return collector .getEmittedTuples ().size () == 2 ;
415+ });
416+
417+ // We should have emitted two tuples, and they should have been our failed tuples.
395418 assertEquals (2 , collector .getEmittedTuples ().size ());
396419 assertEquals (messageIdsToFail .get (0 ), collector .getEmittedTuples ().get (0 ).getMessageId ());
397420 assertEquals (messageIdsToFail .get (1 ), collector .getEmittedTuples ().get (1 ).getMessageId ());
398421
399- // Ack them
422+ // Ack the previously failed tuples.
400423 spout .ack (messageIdsToFail .get (0 ));
401424 spout .ack (messageIdsToFail .get (1 ));
402425
403426 // Small delay waiting for processing.
404- Thread .sleep (1000L );
405-
406- // Verify that our message were acked in redis.
407- consumerInfo = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
408- assertNotNull (consumerInfo , "Failed to find consumer info!" );
409-
410- // Verify we have 5 pending
411- assertEquals (5L , consumerInfo .getPending (), "Found entries pending" );
427+ // We're looking for the number of pending to drop to 5, since we've now acked 5 of the 10 original tuples.
428+ await ()
429+ .atMost (Duration .ofSeconds (10 ))
430+ .until (() -> {
431+ // Verify that our message were acked in redis.
432+ final StreamConsumerInfo consumerState = redisTestHelper .getConsumerInfo (streamKey , GROUP_NAME , CONSUMER_ID );
433+ assertNotNull (consumerState , "Failed to find consumer info!" );
434+
435+ // Verify we have 5 pending
436+ return consumerState .getPending () == 5 ;
437+ });
412438
413439 // Deactivate and close via Autocloseable
414440 spout .deactivate ();
0 commit comments