Skip to content

Conversation

@apalan60
Copy link
Contributor

@apalan60 apalan60 commented Oct 25, 2025

Related discussion:
#20594 (review)

Problem

The test OffsetValidationTest.test_fencing_static_consumer failed when
executed with
fencing_stage=stable and group_protocol=consumer.
It timed out while waiting for the group to become empty because the
conflicting static consumers re-joined after the original members
stopped, keeping the group non-empty and causing the timeout.

Fix

For the consumer-protocol path, the test now waits for all conflicting
consumer processes to terminate before stopping the original static
members. This ensures that each conflicting consumers is fully fenced
and cannot re-join the group after the original members stop.

Reviewers: Chia-Ping Tsai [email protected]

@github-actions github-actions bot added triage PRs from the community tests Test fixes (including flaky tests) small Small PRs labels Oct 25, 2025
@apalan60
Copy link
Contributor Author

Test command:

TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" _DUCKTAPE_OPTIONS='--parameters '\''{"fencing_stage":"stable","group_protocol":"consumer","num_conflict_consumers":1,"metadata_quorum":"ISOLATED_KRAFT"}'\' bash tests/docker/run_tests.sh

Before:

====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-10-25--006
run time:         2 minutes 2.210 seconds
tests run:        1
passed:           0
flaky:            0
failed:           1
ignored:          0
====================================================================================================
test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.fencing_stage=stable.group_protocol=consumer.num_conflict_consumers=1.metadata_quorum=ISOLATED_KRAFT
status:     FAIL
run time:   2 minutes 2.092 seconds


    TimeoutError('Timed out waiting for the consumers to be removed from the group.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run
    data = self.run_test()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 341, in test_fencing_static_consumer
    wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception
ducktape.errors.TimeoutError: Timed out waiting for the consumers to be removed from the group.

----------------------------------------------------------------------------------------------------

After:

====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-10-25--008
run time:         1 minute 35.257 seconds
tests run:        1
passed:           1
flaky:            0
failed:           0
ignored:          0
====================================================================================================
test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.fencing_stage=stable.group_protocol=consumer.num_conflict_consumers=1.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   1 minute 35.141 seconds
----------------------------------------------------------------------------------------------------

return consumer

def _node_failed_with_unreleased_instance_id(self, node):
cmd = "grep -q 'UnreleasedInstanceIdException' %s" % VerifiableConsumer.LOG_FILE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the error is subject to change, would it be more reliable to check the process ID (PID) directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712

Thanks for the suggestion!
Given backward compatibility, checking the PID makes more sense.
I’ve made the corresponding adjustment.

@apalan60
Copy link
Contributor Author

Reran the test to reflect the recent changes:

TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" _DUCKTAPE_OPTIONS='--parameters '\''{"fencing_stage":"stable","group_protocol":"consumer","num_conflict_consumers":1,"metadata_quorum":"ISOLATED_KRAFT"}'\' bash tests/docker/run_tests.sh
====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-10-25--017
run time:         1 minute 26.607 seconds
tests run:        1
passed:           1
flaky:            0
failed:           0
ignored:          0
====================================================================================================
test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.fencing_stage=stable.group_protocol=consumer.num_conflict_consumers=1.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   1 minute 26.490 seconds
----------------------------------------------------------------------------------------------------

Also tested other parameter combinations, and they were not affected.

TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" bash tests/docker/run_tests.sh
====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-10-25--016
run time:         6 minutes 23.384 seconds
tests run:        8
passed:           8
flaky:            0
failed:           0
ignored:          0
====================================================================================================

@github-actions github-actions bot removed the triage PRs from the community label Oct 26, 2025

def await_conflict_consumers_fenced(self, conflict_consumer):
# Ensure every conflicting consumer actually starts once before we wait for fencing.
started_nodes = set()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this started_nodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, no longer in use.

started_nodes.add(node)
return len(conflict_consumer.alive_nodes()) == len(conflict_consumer.nodes)

wait_until(all_conflict_consumers_started,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the conflicting consumer dies before this check occurs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder.

conflict_consumer.start() runs asynchronously. The detection of a duplicate groupId and the subsequent UnreleasedInstanceIdException that interrupts the consumer process both occur during this phase.

If the newly added validation method await_conflict_consumers_fenced(conflict_consumer) is executed before the conflict consumer is interrupted, wait_until(all_conflict_consumers_started) will time out since it never sees all consumers started, leading to the test failure.

In my environment, adding a 2-second sleep before the validation step can reliably reproduce the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve modified the test to only focus on verifying that the conflict consumer node has properly terminated.

This should make the test more stable.

timeout_sec=60,
err_msg="Timed out waiting for conflict consumers to terminate after fencing")

# Guard against stray processes that could rejoin once the original static members stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remind readers that UnreleasedInstanceIdException is a fatal error, resulting in the consumer's immediate failure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.position = position if position is not None else {}
self.committed = committed if committed is not None else {}
self.total_consumed = total_consumed
self.shutdown_complete = shutdown_complete
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerEventHandler instantiates every consumer in ConsumerState.Dead. During the test, we sometimes read that state before the background thread emits startup_complete, so
the test concluded “this node started and then shut down” even though it never left the default state.

We now keep an explicit shutdown_complete flag so the fencing test only proceeds once each conflicting consumer has actually gone through the start→shutdown sequence.

stateDiagram-v2
    [*] --> Dead : Init (default)
    Dead --> Started : startup_complete
    Started --> Dead : shutdown_complete(flag = true)
Loading

@apalan60
Copy link
Contributor Author

apalan60 commented Oct 30, 2025

The following are the updated test results.

TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" bash tests/docker/run_tests.sh
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id:       2025-10-30--005
run time:         6 minutes 34.481 seconds
tests run:        8
passed:           8
flaky:            0
failed:           0
ignored:          0
================================================================================

Also ran this test 20 times using the following command to verify its stability, and all runs passed successfully.

for i in {1..20}; do
  echo "Run $i"
  TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" \
  _DUCKTAPE_OPTIONS="--parameters '{\"fencing_stage\":\"stable\",\"group_protocol\":\"consumer\",\"num_conflict_consumers\":1,\"metadata_quorum\":\"ISOLATED_KRAFT\"}'" \
  bash tests/docker/run_tests.sh || break
done

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 75768dd into apache:trunk Oct 31, 2025
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved small Small PRs tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants