File tree Expand file tree Collapse file tree
dev-packages/node-integration-tests/suites/tracing/kafkajs Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -23,15 +23,23 @@ async function run() {
2323 await consumer . connect ( ) ;
2424 await consumer . subscribe ( { topic : 'test-topic' , fromBeginning : true } ) ;
2525
26- consumer . run ( {
26+ // Resolve once the consumer has actually joined its group. A fixed sleep
27+ // here was racy on slow CI runners: if the producer sent before the
28+ // consumer joined, the consumer transaction sometimes wasn't created
29+ // within the test timeout. Register the listener before `run()` so the
30+ // event can't fire before we're listening.
31+ const groupJoined = new Promise ( resolve => {
32+ consumer . on ( consumer . events . GROUP_JOIN , ( ) => resolve ( ) ) ;
33+ } ) ;
34+
35+ await consumer . run ( {
2736 eachMessage : async ( { message } ) => {
2837 // eslint-disable-next-line no-console
2938 console . debug ( 'Received message' , message . value . toString ( ) ) ;
3039 } ,
3140 } ) ;
3241
33- // Wait for the consumer to be ready
34- await new Promise ( resolve => setTimeout ( resolve , 4000 ) ) ;
42+ await groupJoined ;
3543
3644 await producer . send ( {
3745 topic : 'test-topic' ,
You can’t perform that action at this time.
0 commit comments