Skip to content

Commit 5675261

Browse files
committed
Consumer commit fix
1 parent 6209526 commit 5675261

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

src/karapace/kafka_rest_apis/consumer_manager.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
Copyright (c) 2023 Aiven Ltd
33
See LICENSE for details
44
"""
5+
import traceback
56

67
from aiokafka.errors import (
78
GroupAuthorizationFailedError,
89
IllegalStateError,
910
KafkaConfigurationError,
1011
KafkaError,
1112
TopicAuthorizationFailedError,
12-
UnknownTopicOrPartitionError,
13+
UnknownTopicOrPartitionError, UnknownError,
1314
)
1415
from asyncio import Lock
1516
from collections import defaultdict, namedtuple
@@ -291,6 +292,13 @@ async def commit_offsets(
291292
payload = payload or None
292293
try:
293294
await consumer.commit(offsets=payload)
295+
except UnknownError as ue:
296+
error_trace = traceback.format_exc() # Capture the full stack trace
297+
offset_stored_err_message = "Commit failed: Local: No offset stored"
298+
if offset_stored_err_message in error_trace:
299+
LOG.warning("Ignoring KafkaError: No offset stored.")
300+
else:
301+
KarapaceBase.internal_error(message=f"error sending commit request: {ue}", content_type=content_type)
294302
except KafkaError as e:
295303
KarapaceBase.internal_error(message=f"error sending commit request: {e}", content_type=content_type)
296304
empty_response()

tests/integration/kafka_rest_apis/test_rest_consumer.py

+8
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ async def test_offsets_no_payload(rest_async_client, admin_client, producer, tra
293293
producer.send(topic_name, value=b"message-value")
294294
producer.flush()
295295

296+
# Commit should not throw any error, even before consuming events
297+
res = await rest_async_client.post(
298+
offsets_path,
299+
headers=header,
300+
json={}
301+
)
302+
assert res.ok, f"Expected a successful response: {res}"
303+
296304
resp = await rest_async_client.get(consume_path, headers=header)
297305
assert resp.ok, f"Expected a successful response: {resp}"
298306

0 commit comments

Comments
 (0)