Skip to content

Conversation

@izzyharker
Copy link
Contributor

@izzyharker izzyharker commented Oct 7, 2025

The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.

This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.

Also added tests to ensure this change does not impact desired behavior
for large compressible records.

Reviewers: Sean Quah [email protected], David Jacot
[email protected]

@github-actions github-actions bot added triage PRs from the community group-coordinator labels Oct 7, 2025
@izzyharker izzyharker marked this pull request as ready for review October 7, 2025 14:58
@dajac dajac self-requested a review October 7, 2025 15:00
@dajac dajac added ci-approved and removed triage PRs from the community labels Oct 7, 2025
@dajac dajac changed the title [KAFKA-19760]: RecordTooLargeExceptions in group coordinator when offsets.topic.compression.codec is used KAFKA-19760: RecordTooLargeExceptions in group coordinator when offsets.topic.compression.codec is used Oct 7, 2025
Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! I left a few comments.

Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR!

}

@Test
public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a variant of this test that would try to pack the large record into the same batch as write1 under the old implementation? 3x of the max batch size is too large for that and we'd need something like 0.75 of the max batch size. We can use @ParameterizedTest, @ValueSource and add a double parameter for the fraction of the max batch size to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a variant but separated it out into a different test because they behave slightly differently. When the record is smaller than batch size but won't fit in the current batch, it flushes the current batch but then doesn't trigger a second flush on the new batch. The 3x batch size triggers a second flush as well so having two tests to check both behaviors is good.

Comment on lines 1003 to 1007
int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
CompressionType.NONE,
recordsToAppend
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is kind of annoying that we compute the size twice, especially that estimatedSize is estimatedSizeUpperBound with a fixed factor. Could we combine?

Otherwise, I wonder whether we should just remove the check on estimatedSize below and rely on the check from the log layer. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can combine them. But we can remove the estimatedSize check and rely on the log layer. Then when we have an overly large atomic write, we will 1) flush the current batch, 2) write a new batch, 3) flush it immediately, which fails. The downside is that we will do a bunch of extra work for oversized writes.

@izzyharker what do you think about removing the estimatedSize check?

Copy link
Contributor Author

@izzyharker izzyharker Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine either way? If the issue doing the size check twice we could use the estimated ratio on the uncompressed size as an estimate rather than making two method calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The estimatedSize is likely wrong anyway. I am for removing it. @squah-confluent Is it ok for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm fine with removing the check against estimatedSize entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

// If flushing fails, we don't catch the exception in order to let
// the caller fail the current operation.
maybeFlushCurrentBatch(currentTimeMs);
if (isAtomic && !currentBatch.builder.hasRoomFor(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to understand the isAtomic here. Let's say that we have two records that must be written automatically, we still have space in the batch for others. Why do we force a flush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't force a flush. We only flush if we took the atomic path and uncompressed batch size >= max.message.size, which means the next atomic write will flush the current batch before writing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Would it be possible to put !currentBatch.builder.hasRoomFor(0) within maybeFlushCurrentBatch or do we really need to rely on isAtomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the harm in moving it to maybeFlushCurrentBatch.

@github-actions github-actions bot added streams core Kafka Broker tools connect dependencies Pull requests that update a dependency file storage Pull requests that target the storage module build Gradle build or GitHub Actions docker Official Docker image labels Oct 14, 2025
@airlock-confluentinc airlock-confluentinc bot force-pushed the unknown_server_errors_iharker branch from 9bfdfdf to d17a9ff Compare October 14, 2025 15:26
@izzyharker izzyharker requested a review from dajac October 15, 2025 15:27
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks

@dajac dajac merged commit 388739f into apache:trunk Oct 16, 2025
22 checks passed
@dajac dajac deleted the unknown_server_errors_iharker branch October 16, 2025 09:10
dajac pushed a commit that referenced this pull request Oct 16, 2025
…ts.topic.compression.codec is used (#20653)

The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.

This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.

Also added tests to ensure this change does not impact desired behavior
for large compressible records.

Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
@dajac
Copy link
Member

dajac commented Oct 16, 2025

Merged to trunk and 4.1. I was not able to cherry-pick it to 4.0 due to some conflicts. @izzyharker Could you please open a PR for 4.0 branch?

dajac pushed a commit that referenced this pull request Oct 17, 2025
…ts.topic.compression.codec is used (4.0) (#20715)

This PR backports the change from
#20653 to 4.0

The group coordinator has been having issues with unknown errors. The
theory is that this is caused by optimistic compression estimates which
cause unchecked batch overflows when trying to write.

This PR adds a check for uncompressed record size to flush batches more
eagerly and avoid overfilling partially-full batches. This should make
the group coordinator errors less frequent.

Also added tests to ensure this change does not impact desired behavior
for large compressible records.

Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@majialoong and I were discussing the condition (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs. The correct version seems to be (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs. Or we can remove it, since a lingerTimeoutTask already exists

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions ci-approved clients connect core Kafka Broker dependencies Pull requests that update a dependency file docker Official Docker image group-coordinator storage Pull requests that target the storage module streams tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants