feat: implement unlimited pagination for Kafka message listing#2226
feat: implement unlimited pagination for Kafka message listing#2226
Conversation
Add stateless cursor-based pagination for ListMessages: - Add page token with per-partition cursor tracking and round-robin distribution across partitions - Wire pagination fields through proto, API, and service layers - Handle ascending/descending direction with compaction adjustment - Cap total messages (pageSize * partitions) at 100K to prevent OOM - Add integration tests for pagination scenarios
Move the first-pass partition loop from calculateConsumeRequestsWithPageToken into a dedicated collectPartitionStates method, bringing cyclomatic complexity under the gocyclo limit of 30.
Replace MaxResults: -1 with PageSize: 50 in pagination tests so they correctly enter pagination mode. Update proto comments to reflect that PageSize (not MaxResults=-1) enables pagination mode.
|
The latest Buf updates on your PR. Results from workflow Buf CI / validate (pull_request).
|
|
@claude review |
|
Claude finished @c-julin's task in 2m 35s —— View job Code Review
Overall AssessmentThis is a well-implemented feature that adds cursor-based pagination support to Kafka message listing. The code demonstrates solid engineering practices with comprehensive test coverage, good error handling, and clear separation of concerns. Below are specific findings and suggestions for improvement. Strengths1. Excellent Test Coverage
2. Security and Validation
3. Stateless Design
4. Backwards Compatibility
Issues and SuggestionsCritical Issues1. Race Condition in Page Token Direction LogicLocation: direction := DirectionAscending // Legacy mode is ascending
// Determine the max message count for this request
// In pagination mode, use PageSize; otherwise use MessageCount (maxResults)
maxMessageCount := listReq.MessageCount
if listReq.PageSize > 0 && token != nil {
// Pagination mode: use direction from token and limit to PageSize
direction = token.Direction
maxMessageCount = listReq.PageSize
}Issue: The Impact: This could cause inconsistent message ordering between legacy and pagination modes. Recommendation: Clarify the actual legacy mode direction and ensure consistency. If legacy mode should be ascending, update 2. Missing Partition Compaction Handling in Descending ModeLocation: Issue: The code handles compaction adjustments only for ascending mode (list_messages.go:573-580), but descending mode could also encounter compaction where the Current code: if token.Direction == DirectionAscending && cursor.NextOffset < updatedLowWaterMark {
cursor.NextOffset = updatedLowWaterMark
}Missing case: When in descending mode, if compaction occurs and Recommendation: Add compaction handling for descending mode: // Handle compaction for both directions
if cursor.NextOffset < updatedLowWaterMark {
s.logger.DebugContext(ctx, "adjusting nextOffset due to compaction",
slog.String("topic", token.TopicName),
slog.Int("partition", int(cursor.ID)),
slog.Int64("old_next_offset", cursor.NextOffset),
slog.Int64("new_low_watermark", updatedLowWaterMark))
cursor.NextOffset = updatedLowWaterMark
}High Priority Issues3. Inconsistent Empty Topic HandlingLocation: Issue: The test expects nextOffset = end.Offset - 1 // 0 - 1 = -1
if nextOffset < start.Offset {
nextOffset = start.Offset // clamps to 0
}This works correctly, but the logic is subtle and relies on the clamping. For ascending mode starting from an empty partition (LWM=HWM=0), Recommendation: Add explicit handling for empty partitions to make the logic clearer and more maintainable: var nextOffset int64
if start.Offset == end.Offset {
// Empty partition: no messages available
nextOffset = start.Offset
} else if direction == DirectionDescending {
// For descending order, start from high water mark - 1
nextOffset = end.Offset - 1
if nextOffset < start.Offset {
nextOffset = start.Offset
}
} else {
// For ascending order, start from low water mark
nextOffset = start.Offset
}4. No Page Token Version/Schema EvolutionLocation: Issue: The Recommendation: Add a version field: type PageToken struct {
Version int `json:"v"` // Schema version (default 1)
TopicName string `json:"t"`
// ... rest of fields
}And validate it: func (pt *PageToken) Validate() error {
if pt.Version == 0 {
pt.Version = 1 // Default to v1 for backwards compat
}
if pt.Version != 1 {
return fmt.Errorf("unsupported page token version: %d", pt.Version)
}
// ... rest of validation
}5. Potential Integer Overflow in Total Messages CalculationLocation: const maxTotalMessages = 100_000
if int64(pt.PageSize)*int64(pt.PartitionCount) > maxTotalMessages {
return fmt.Errorf("page size %d with %d partitions exceeds max total messages (%d)",
pt.PageSize, pt.PartitionCount, maxTotalMessages)
}Issue: While the current implementation uses Recommendation: Clarify the error message or reconsider if this check is necessary: // This check prevents unreasonably large page sizes when combined with many partitions
// The actual fetch will distribute PageSize messages across partitions
if int64(pt.PageSize)*int64(pt.PartitionCount) > maxTotalMessages {
return fmt.Errorf("page size %d with %d partitions would request %d total messages, exceeding limit of %d",
pt.PageSize, pt.PartitionCount, int64(pt.PageSize)*int64(pt.PartitionCount), maxTotalMessages)
}Medium Priority Issues6. Unclear Proto Comment About Pagination Mode TriggerLocation: Comment says:
Issue: The comment says "Defaults to 50 if not set" but the validation shows it's optional ( Recommendation: Clarify the comment: int32 page_size = 14 [(buf.validate.field).int32 = {
gte: 1
lte: 500
}]; // Number of messages per page. When > 0, pagination mode is enabled and max_results is ignored. When 0 or unset, legacy mode is used.7. Missing Context Cancellation Check in Round-Robin LoopLocation: Issue: The round-robin distribution loop could theoretically run for a long time with many partitions and large page sizes, but it doesn't check for context cancellation. Recommendation: Add periodic cancellation checks: for remainingMessages > 0 && yieldingPartitions > 0 {
// Check for cancellation periodically
select {
case <-ctx.Done():
return nil, "", false, ctx.Err()
default:
}
progressMade := false
// ... rest of loop
}8. Potential Memory Inefficiency in Partition StatesLocation: Issue: The code creates a new slice for nextToken := &PageToken{
// ...
Partitions: make([]PartitionCursor, 0, len(token.Partitions)),
// ...
}Recommendation: Pre-allocate based on actual active partitions (after exhaustion check), or use the actual length after the fact: nextToken := &PageToken{
TopicName: token.TopicName,
PartitionCount: token.PartitionCount,
Partitions: make([]PartitionCursor, 0), // Use 0 capacity, let it grow naturally
Direction: token.Direction,
PageSize: token.PageSize,
}Low Priority / Nice-to-Have9. Documentation: Clarify NextOffset SemanticsLocation: Issue: The Current: NextOffset int64 `json:"no"` // Next offset to read fromSuggestion: // NextOffset is the next offset to consume:
// - Descending: the highest offset to read (inclusive), next page reads from NextOffset-PageSize
// - Ascending: the lowest offset to read (inclusive), next page reads from NextOffset+PageSize
// - May be -1 when descending mode exhausts a partition (read back to offset 0)
NextOffset int64 `json:"no"`10. Integration Test Coverage: Missing Edge CasesLocation: Missing test scenarios:
Recommendation: Add integration tests for these edge cases to ensure robustness. 11. Code Duplication in Exhaustion ChecksLocation: Multiple places check partition exhaustion with similar logic The exhaustion logic appears in:
Recommendation: Consider extracting to a single helper method to reduce duplication and ensure consistency: func (cursor *PartitionCursor) IsExhausted(direction string) bool {
if direction == DirectionDescending {
return cursor.NextOffset < cursor.LowWaterMark
}
return cursor.NextOffset >= cursor.HighWaterMark
}12. Nolint Directives Could Be ReducedLocation: Issue: Multiple Recommendation: Consider further extraction of logical blocks into helper functions to naturally reduce complexity without needing lint exceptions. Minor Observations
SummaryThis PR implements a solid foundation for unlimited pagination. The critical issues around direction consistency and compaction handling should be addressed before merging. The medium priority issues around error messages, context cancellation, and versioning would improve robustness. The low priority suggestions would enhance maintainability and test coverage but aren't blockers. The overall architecture is sound, and the implementation demonstrates good engineering practices with strong test coverage and clear separation of concerns. |
Summary
Adds cursor-based pagination support to the Kafka message listing API. When
page_sizeis set, the backend enters pagination mode — fetching messages in pages with opaque page tokens for resumption, whilemax_resultsis ignored. This enables clients to paginate through all messages in a topic without a hard upper limit.PageTokenencoding/decoding with per-partition offsets and direction trackingPageSizeand generatesnext_page_tokenin the stream completion messagecollectPartitionStateshelper to reduce cyclomatic complexity in the listing logicPageSizeto correctly trigger pagination modepage_sizecontrols pagination mode, notmax_results