Skip to content

[KIP-848] Tests for changes in DescribeConsumerGroups #1984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def use_group_protocol_consumer():
@staticmethod
def update_conf_group_protocol(conf=None):
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
conf['group.protocol'] = 'consumer'
if 'group.protocol' not in conf:
conf['group.protocol'] = 'consumer'

@staticmethod
def remove_forbidden_conf_group_protocol_consumer(conf):
Expand Down
40 changes: 29 additions & 11 deletions tests/integration/admin/test_describe_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType)
from confluent_kafka.error import ConsumeError
from confluent_kafka import ConsumerGroupState, TopicCollection

from tests.common import TestUtils
from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType

topic_prefix = "test-topic"

Expand All @@ -30,12 +28,16 @@ def verify_commit_result(err, _):
assert err is not None


def consume_messages(sasl_cluster, group_id, topic, num_messages=None):
def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None):
conf = {'group.id': group_id,
'session.timeout.ms': 6000,
'group.protocol': group_protocol,
'enable.auto.commit': False,
'on_commit': verify_commit_result,
'auto.offset.reset': 'earliest'}

if group_protocol == 'classic':
conf['session.timeout.ms'] = 6000

consumer = sasl_cluster.consumer(conf)
consumer.subscribe([topic])
read_messages = 0
Expand Down Expand Up @@ -164,7 +166,7 @@ def verify_describe_groups(cluster, admin_client, topic):

# Consume some messages for the group
group = 'test-group'
consume_messages(cluster, group, topic, 2)
consume_messages(cluster, group, 'classic', topic, 2)

# Verify Describe Consumer Groups
desc = verify_provided_describe_for_authorized_operations(admin_client,
Expand All @@ -177,10 +179,30 @@ def verify_describe_groups(cluster, admin_client, topic):
assert group == desc.group_id
assert desc.is_simple_consumer_group is False
assert desc.state == ConsumerGroupState.EMPTY
assert desc.type == ConsumerGroupType.CLASSIC

# Delete group
perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10)

consumer_group = 'test-group-consumer'

consume_messages(cluster, consumer_group, 'consumer', topic, 2)

desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_consumer_groups,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.GROUP,
consumer_group,
[consumer_group])
assert consumer_group == desc.group_id
assert desc.is_simple_consumer_group is False
assert desc.state == ConsumerGroupState.EMPTY
assert desc.type == ConsumerGroupType.CONSUMER

# Delete group
perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10)


def verify_describe_cluster(admin_client):
desc = verify_provided_describe_for_authorized_operations(admin_client,
Expand Down Expand Up @@ -217,11 +239,7 @@ def test_describe_operations(sasl_cluster):
verify_describe_topics(admin_client, our_topic)

# Verify Authorized Operations in Describe Groups
# Skip this test if using group protocol `consumer`
# as there is new RPC for describe_groups() in
# group protocol `consumer` case.
if not TestUtils.use_group_protocol_consumer():
verify_describe_groups(sasl_cluster, admin_client, our_topic)
verify_describe_groups(sasl_cluster, admin_client, our_topic)

# Delete Topic
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)