|
13 | 13 | from karapace.core.config import Config
|
14 | 14 | from karapace.core.kafka.admin import KafkaAdminClient
|
15 | 15 |
|
16 |
| -from src.karapace.backup.api import _consumer, _consume_records |
| 16 | +from src.karapace.backup.api import _consume_records |
17 | 17 | from src.karapace.backup.poll_timeout import PollTimeout
|
18 |
| -from src.karapace.core.kafka_utils import kafka_producer_from_config |
| 18 | +from src.karapace.core.kafka_utils import kafka_producer_from_config, kafka_consumer_from_config |
19 | 19 | from tests.integration.conftest import create_kafka_server
|
20 | 20 | from tests.integration.utils.config import KafkaDescription
|
21 | 21 | from tests.integration.utils.kafka_server import KafkaServers
|
@@ -117,8 +117,11 @@ def produce_consume_messages(config: Config, new_topic: str, invalid_config: boo
|
117 | 117 |
|
118 | 118 |
|
119 | 119 | def consume_messages(config, new_topic):
|
120 |
| - with _consumer(config, new_topic) as consumer: |
| 120 | + with kafka_consumer_from_config(config, new_topic) as consumer: |
121 | 121 | (partition,) = consumer.partitions_for_topic(new_topic)
|
122 |
| - topic_partition = TopicPartition(new_topic, partition) |
123 |
| - for _ in _consume_records(consumer, topic_partition, PollTimeout.default()): |
| 122 | + for _ in _consume_records( |
| 123 | + consumer=consumer, |
| 124 | + topic_partition=TopicPartition(new_topic, partition), |
| 125 | + poll_timeout=PollTimeout.default(), |
| 126 | + ): |
124 | 127 | pass
|
0 commit comments