Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,18 @@ func (*Agent) push(ctx context.Context, aggregator *models.RunningAggregator, ac

select {
case <-time.After(until):
// Go's timers may fire slightly before the deadline, especially
// on platforms with coarse timer granularity. Sleep any remaining
// time so Push always runs at or after the window end, never
// before.
if remaining := time.Until(aggregator.EndPeriod()); remaining > 0 {
select {
case <-time.After(remaining):
case <-ctx.Done():
aggregator.Push(acc)
return
}
}
aggregator.Push(acc)
case <-ctx.Done():
aggregator.Push(acc)
Expand Down
14 changes: 9 additions & 5 deletions models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,16 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) {
since := r.periodEnd
until := r.periodEnd.Add(r.Config.Period)

// Check if the next aggregation window will contain "now". This might
// not be the case if the machine's clock was adjusted or the machine
// hibernated as in those cases the clock might be advanced before or
// after the initial aggregation window.
// Reset the window only when the current wall-clock time is outside
// the next aggregation window by at least one full period, which
// indicates the clock was adjusted or the machine hibernated. Smaller
// skew (e.g., a Go timer firing a few ms early) is not a clock jump;
// resetting in that case would recompute the same slot and cause Push
// to run twice for one period.
nowWall := time.Now().Truncate(-1)
if nowWall.Before(since.Truncate(-1)) || nowWall.After(until.Truncate(-1)) {
earliest := since.Add(-r.Config.Period).Truncate(-1)
latest := until.Add(r.Config.Period).Truncate(-1)
if nowWall.Before(earliest) || nowWall.After(latest) {
since = nowWall.Truncate(r.Config.Period)
until = since.Add(r.Config.Period)
}
Expand Down
45 changes: 45 additions & 0 deletions models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,51 @@ func TestRunningAggregatorAddDoesNotModifyMetric(t *testing.T) {
testutil.RequireMetricEqual(t, expected, m)
}

func TestRunningAggregatorPushAdvancesWindowWhenFiringEarly(t *testing.T) {
ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{NamePass: []string{"*"}},
Period: time.Second,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}

// Simulate a timer firing slightly before periodEnd: set periodEnd
// 500ms in the future so time.Now() inside Push falls within the
// current window. Before the drift safeguard was loosened, this
// tripped the reset and left periodEnd unchanged, causing a second
// Push for the same period.
windowEnd := time.Now().Add(500 * time.Millisecond)
ra.UpdateWindow(windowEnd.Add(-ra.Config.Period), windowEnd)

ra.Push(&acc)

require.True(t, ra.EndPeriod().Equal(windowEnd.Add(ra.Config.Period)),
"expected periodEnd to advance by one period: want %v, got %v",
windowEnd.Add(ra.Config.Period), ra.EndPeriod())
}

func TestRunningAggregatorPushResetsWindowOnLargeForwardJump(t *testing.T) {
// Regression guard for PR #16375: the drift safeguard must still
// reset the window when the wall clock is more than one period
// beyond the expected window, such as after hibernation.
ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{NamePass: []string{"*"}},
Period: time.Second,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}

staleEnd := time.Now().Add(-2 * time.Minute)
ra.UpdateWindow(staleEnd.Add(-ra.Config.Period), staleEnd)

ra.Push(&acc)

require.Less(t, time.Since(ra.EndPeriod()).Abs(), 2*ra.Config.Period,
"expected safeguard to reset window near current time, got %v", ra.EndPeriod())
}

type mockAggregator struct {
sum int64
}
Expand Down
Loading