Skip to content

Commit 357e2df

Browse files
test(node-integration-tests): Wait for kafkajs GROUP_JOIN before producing (#21074)
## Summary - Replace the fixed 4s `setTimeout` in `suites/tracing/kafkajs/scenario.mjs` with a deterministic wait on the kafkajs `GROUP_JOIN` event. - Also `await consumer.run(...)` instead of leaving the promise dangling. ## Why The kafkajs integration test flaked with the consumer transaction never arriving within the test timeout. The scenario was: 1. `consumer.subscribe(...)` (awaited) 2. `consumer.run(...)` — **not** awaited 3. `await new Promise(resolve => setTimeout(resolve, 4000))` 4. `producer.send(...)` On slow CI runners, the consumer group rebalance / join can take longer than the fixed 4 seconds. Although `fromBeginning: true` lets a late-joining consumer still read offset 0, when the join is slow enough the consumer transaction isn't created before the per-test timeout. Listening for the `GROUP_JOIN` event removes the timing assumption entirely: we proceed exactly when the consumer is in its group and actively polling. The listener is registered before `consumer.run()` so it cannot miss the event. This mirrors the prior fix for the same suite (#20189, which addressed transaction ordering) and the same race-on-startup pattern already documented in the amqplib scenario. Fixes #21044 Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e505a94 commit 357e2df

1 file changed

Lines changed: 11 additions & 3 deletions

File tree

  • dev-packages/node-integration-tests/suites/tracing/kafkajs

dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.mjs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,23 @@ async function run() {
2323
await consumer.connect();
2424
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
2525

26-
consumer.run({
26+
// Resolve once the consumer has actually joined its group. A fixed sleep
27+
// here was racy on slow CI runners: if the producer sent before the
28+
// consumer joined, the consumer transaction sometimes wasn't created
29+
// within the test timeout. Register the listener before `run()` so the
30+
// event can't fire before we're listening.
31+
const groupJoined = new Promise(resolve => {
32+
consumer.on(consumer.events.GROUP_JOIN, () => resolve());
33+
});
34+
35+
await consumer.run({
2736
eachMessage: async ({ message }) => {
2837
// eslint-disable-next-line no-console
2938
console.debug('Received message', message.value.toString());
3039
},
3140
});
3241

33-
// Wait for the consumer to be ready
34-
await new Promise(resolve => setTimeout(resolve, 4000));
42+
await groupJoined;
3543

3644
await producer.send({
3745
topic: 'test-topic',

0 commit comments

Comments
 (0)