Skip to content

Commit accf9b0

Browse files
committed
Update consumer session timeout config
1 parent 6209526 commit accf9b0

File tree

1 file changed

+49
-36
lines changed

1 file changed

+49
-36
lines changed

tests/integration/backup/test_session_timeout.py

+49-36
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,22 @@
66
from pathlib import Path
77

88
import pytest
9-
from aiokafka.errors import NoBrokersAvailable
9+
from aiokafka.errors import InvalidSessionTimeoutError
10+
from confluent_kafka import TopicPartition
1011
from confluent_kafka.admin import NewTopic
1112

12-
from karapace.backup.api import BackupVersion, create_backup
1313
from karapace.core.config import Config
1414
from karapace.core.kafka.admin import KafkaAdminClient
15-
from karapace.core.kafka_utils import kafka_producer_from_config
15+
16+
from src.karapace.backup.api import _consume_records
17+
from src.karapace.backup.poll_timeout import PollTimeout
18+
from src.karapace.core.kafka_utils import kafka_producer_from_config, kafka_consumer_from_config
1619
from tests.integration.conftest import create_kafka_server
1720
from tests.integration.utils.config import KafkaDescription
1821
from tests.integration.utils.kafka_server import KafkaServers
1922

2023
SESSION_TIMEOUT_MS = 65000
24+
INVALID_SESSION_TIMEOUT_MS = 5000
2125
GROUP_MIN_SESSION_TIMEOUT_MS = 60000
2226
GROUP_MAX_SESSION_TIMEOUT_MS = 70000
2327

@@ -44,13 +48,13 @@ def fixture_kafka_server(
4448
)
4549

4650

47-
def test_producer_with_custom_kafka_properties_does_not_fail(
51+
def test_consumer_with_custom_kafka_properties_does_not_fail(
4852
kafka_server_session_timeout: KafkaServers,
4953
new_topic: NewTopic,
5054
tmp_path: Path,
5155
) -> None:
5256
"""
53-
This test checks wether the custom properties are accepted by kafka.
57+
This test checks weather the custom properties are accepted by kafka.
5458
We know by the implementation of the consumer startup code that if
5559
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
5660
will raise an exception during the startup.
@@ -64,51 +68,60 @@ def test_producer_with_custom_kafka_properties_does_not_fail(
6468
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
6569
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)
6670

67-
with kafka_producer_from_config(config) as producer:
68-
producer.send(
69-
new_topic.topic,
70-
key=b"foo",
71-
value=b"bar",
72-
partition=0,
73-
headers=[
74-
("some-header", b"some header value"),
75-
("other-header", b"some other header value"),
76-
],
77-
timestamp=1683474657,
78-
)
79-
producer.flush()
80-
81-
# without performing the backup the exception isn't raised.
82-
create_backup(
83-
config=config,
84-
backup_location=tmp_path / "backup",
85-
topic_name=new_topic.topic,
86-
version=BackupVersion.V3,
87-
replication_factor=1,
88-
)
71+
produce_consume_messages(config, new_topic.topic, False)
8972

9073

91-
def test_producer_with_custom_kafka_properties_fail(
74+
def test_consumer_with_custom_kafka_properties_fail(
9275
kafka_server_session_timeout: KafkaServers,
9376
new_topic: NewTopic,
77+
tmp_path: Path,
9478
) -> None:
9579
"""
96-
This test checks wether the custom properties are accepted by kafka.
80+
This test checks weather the custom properties are accepted by kafka.
9781
We know by the implementation of the consumer startup code that if
9882
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
9983
will raise an exception during the startup.
10084
This test ensures that the `session.timeout.ms` can be injected in
101-
the kafka config so that the exception isn't raised
85+
the kafka config so that the exception is raised
10286
"""
10387
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
10488
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)
10589

10690
config = Config()
107-
# TODO: This test is broken. Test has used localhost:9092 when this should use
10891
# the configured broker from kafka_server_session.
109-
# config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0]
110-
config.bootstrap_uri = "localhost:9092"
92+
config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0]
93+
# configure session timeout less than min session time
94+
config.session_timeout_ms = INVALID_SESSION_TIMEOUT_MS
95+
96+
produce_consume_messages(config, new_topic.topic, True)
97+
98+
99+
def produce_consume_messages(config: Config, new_topic: str, invalid_config: bool):
100+
with kafka_producer_from_config(config) as producer:
101+
producer.send(
102+
new_topic,
103+
key=b"foo",
104+
value=b"bar",
105+
partition=0,
106+
headers=[
107+
("some-header", b"some header value"),
108+
("other-header", b"some other header value"),
109+
],
110+
timestamp=1683474657,
111+
)
112+
if invalid_config:
113+
with pytest.raises(InvalidSessionTimeoutError):
114+
consume_messages(config, new_topic)
115+
else:
116+
consume_messages(config, new_topic)
117+
111118

112-
with pytest.raises(NoBrokersAvailable):
113-
with kafka_producer_from_config(config) as producer:
114-
_ = producer
119+
def consume_messages(config, new_topic):
120+
with kafka_consumer_from_config(config, new_topic) as consumer:
121+
(partition,) = consumer.partitions_for_topic(new_topic)
122+
for _ in _consume_records(
123+
consumer=consumer,
124+
topic_partition=TopicPartition(new_topic, partition),
125+
poll_timeout=PollTimeout.default(),
126+
):
127+
pass

0 commit comments

Comments
 (0)