10
10
from confluent_kafka import TopicPartition
11
11
from confluent_kafka .admin import NewTopic
12
12
13
- from karapace .backup .api import BackupVersion , create_backup
14
13
from karapace .core .config import Config
15
14
from karapace .core .kafka .admin import KafkaAdminClient
16
- from karapace .core .kafka_utils import kafka_consumer_from_config
17
15
18
16
from src .karapace .backup .api import _consumer , _consume_records
19
17
from src .karapace .backup .poll_timeout import PollTimeout
@@ -70,17 +68,7 @@ def test_consumer_with_custom_kafka_properties_does_not_fail(
70
68
admin_client = KafkaAdminClient (bootstrap_servers = kafka_server_session_timeout .bootstrap_servers )
71
69
admin_client .new_topic (new_topic .topic , num_partitions = 1 , replication_factor = 1 )
72
70
73
- with kafka_consumer_from_config (config , new_topic .topic ) as consumer :
74
- _ = consumer
75
-
76
- # without performing the backup the exception isn't raised.
77
- create_backup (
78
- config = config ,
79
- backup_location = tmp_path / "backup" ,
80
- topic_name = new_topic .topic ,
81
- version = BackupVersion .V3 ,
82
- replication_factor = 1 ,
83
- )
71
+ produce_consume_messages (config , new_topic .topic , False )
84
72
85
73
86
74
def test_consumer_with_custom_kafka_properties_fail (
@@ -102,10 +90,16 @@ def test_consumer_with_custom_kafka_properties_fail(
102
90
config = Config ()
103
91
# the configured broker from kafka_server_session.
104
92
config .bootstrap_uri = kafka_server_session_timeout .bootstrap_servers [0 ]
93
+ # configure session timeout less than min session time
94
+ config .session_timeout_ms = INVALID_SESSION_TIMEOUT_MS
95
+
96
+ produce_consume_messages (config , new_topic .topic , True )
105
97
98
+
99
+ def produce_consume_messages (config : Config , new_topic : str , invalid_config : bool ):
106
100
with kafka_producer_from_config (config ) as producer :
107
101
producer .send (
108
- new_topic . topic ,
102
+ new_topic ,
109
103
key = b"foo" ,
110
104
value = b"bar" ,
111
105
partition = 0 ,
@@ -115,13 +109,16 @@ def test_consumer_with_custom_kafka_properties_fail(
115
109
],
116
110
timestamp = 1683474657 ,
117
111
)
118
-
119
- # configure session timeout less than min session time
120
- config .session_timeout_ms = INVALID_SESSION_TIMEOUT_MS
121
-
122
- with pytest .raises (InvalidSessionTimeoutError ):
123
- with _consumer (config , new_topic .topic ) as consumer :
124
- (partition ,) = consumer .partitions_for_topic (new_topic .topic )
125
- topic_partition = TopicPartition (new_topic .topic , partition )
126
- for _ in _consume_records (consumer , topic_partition , PollTimeout .default ()):
127
- pass
112
+ if invalid_config :
113
+ with pytest .raises (InvalidSessionTimeoutError ):
114
+ consume_messages (config , new_topic )
115
+ else :
116
+ consume_messages (config , new_topic )
117
+
118
+
119
+ def consume_messages (config , new_topic ):
120
+ with _consumer (config , new_topic ) as consumer :
121
+ (partition ,) = consumer .partitions_for_topic (new_topic )
122
+ topic_partition = TopicPartition (new_topic , partition )
123
+ for _ in _consume_records (consumer , topic_partition , PollTimeout .default ()):
124
+ pass
0 commit comments