Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions pkg/epp/flowcontrol/controller/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -165,16 +164,19 @@ func (sp *ShardProcessor) Run(ctx context.Context) {
sp.wg.Add(1)
go sp.runCleanupSweep(ctx)

// Create a ticker for periodic dispatch attempts to avoid tight loops
dispatchTicker := sp.clock.NewTicker(time.Millisecond)
defer dispatchTicker.Stop()

// This is the main worker loop. It continuously processes incoming requests and dispatches queued requests until the
// context is cancelled. The `select` statement has three cases:
//
// 1. Context Cancellation: The highest priority is shutting down. If the context's `Done` channel is closed, the
// loop will drain all queues and exit. This is the primary exit condition.
// 2. New Item Arrival: If an item is available on `enqueueChan`, it will be processed. This ensures that the
// processor is responsive to new work.
// 3. Default (Dispatch): If neither of the above cases is ready, the `default` case executes, ensuring the loop is
// non-blocking. It continuously attempts to dispatch items from the existing backlog, preventing starvation and
// ensuring queues are drained.
// 3. Dispatch Ticker: Periodically triggers a dispatch cycle to attempt to dispatch items from existing queues,
// ensuring that queued work is processed even when no new items arrive.
for {
select {
case <-ctx.Done():
Expand All @@ -193,14 +195,9 @@ func (sp *ShardProcessor) Run(ctx context.Context) {
continue
}
sp.enqueue(item)
sp.dispatchCycle(ctx)
default:
// If no new items are arriving, continuously try to dispatch from the backlog.
if !sp.dispatchCycle(ctx) {
// If no work was done, yield to the scheduler to prevent a tight, busy-loop when idle, while still allowing for
// immediate rescheduling.
runtime.Gosched()
}
sp.dispatchCycle(ctx) // Process immediately when an item arrives
case <-dispatchTicker.C():
sp.dispatchCycle(ctx) // Periodically attempt to dispatch from queues
}
}
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/epp/flowcontrol/controller/internal/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,23 @@ func TestShardProcessor(t *testing.T) {
defer h.Stop()
h.Go()

// Wait for the dispatch cycle to select our item and pause inside our mock `RemoveFunc`.
select {
case <-itemIsBeingDispatched:
case <-time.After(testWaitTimeout):
t.Fatal("Timed out waiting for item to be dispatched")
// Advance the test clock in small increments until the item is being dispatched or timeout
// This is a more reliable way to ensure the processor has started and run the dispatch cycle
timeout := time.After(testWaitTimeout)
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()

dispatched := false
for !dispatched {
select {
case <-itemIsBeingDispatched:
dispatched = true
case <-timeout:
t.Fatal("Timed out waiting for item to be dispatched")
case <-ticker.C:
// Advance the test clock to trigger the dispatch ticker
h.clock.Step(1 * time.Millisecond)
}
}

// 3. The dispatch goroutine is now paused. We can now safely win the "race" by running cleanup logic.
Expand Down