diff --git a/agent/agent.go b/agent/agent.go index be1003fc5dee9..9a305dd86f9a0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -766,6 +766,18 @@ func (*Agent) push(ctx context.Context, aggregator *models.RunningAggregator, ac select { case <-time.After(until): + // Go's timers may fire slightly before the deadline, especially + // on platforms with coarse timer granularity. Sleep any remaining + // time so Push always runs at or after the window end, never + // before. + if remaining := time.Until(aggregator.EndPeriod()); remaining > 0 { + select { + case <-time.After(remaining): + case <-ctx.Done(): + aggregator.Push(acc) + return + } + } aggregator.Push(acc) case <-ctx.Done(): aggregator.Push(acc) diff --git a/models/running_aggregator.go b/models/running_aggregator.go index 46d87abcf5982..224978537ff7b 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -182,12 +182,16 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { since := r.periodEnd until := r.periodEnd.Add(r.Config.Period) - // Check if the next aggregation window will contain "now". This might - // not be the case if the machine's clock was adjusted or the machine - // hibernated as in those cases the clock might be advanced before or - // after the initial aggregation window. + // Reset the window only when the current wall-clock time is outside + // the next aggregation window by at least one full period, which + // indicates the clock was adjusted or the machine hibernated. Smaller + // skew (e.g., a Go timer firing a few ms early) is not a clock jump; + // resetting in that case would recompute the same slot and cause Push + // to run twice for one period. nowWall := time.Now().Truncate(-1) - if nowWall.Before(since.Truncate(-1)) || nowWall.After(until.Truncate(-1)) { + earliest := since.Add(-r.Config.Period).Truncate(-1) + latest := until.Add(r.Config.Period).Truncate(-1) + if nowWall.Before(earliest) || nowWall.After(latest) { since = nowWall.Truncate(r.Config.Period) until = since.Add(r.Config.Period) } diff --git a/models/running_aggregator_test.go b/models/running_aggregator_test.go index 6909863889018..13da4cb1dfcb7 100644 --- a/models/running_aggregator_test.go +++ b/models/running_aggregator_test.go @@ -240,6 +240,51 @@ func TestRunningAggregatorAddDoesNotModifyMetric(t *testing.T) { testutil.RequireMetricEqual(t, expected, m) } +func TestRunningAggregatorPushAdvancesWindowWhenFiringEarly(t *testing.T) { + ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{NamePass: []string{"*"}}, + Period: time.Second, + }) + require.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + + // Simulate a timer firing slightly before periodEnd: set periodEnd + // 500ms in the future so time.Now() inside Push falls within the + // current window. Before the drift safeguard was loosened, this + // tripped the reset and left periodEnd unchanged, causing a second + // Push for the same period. + windowEnd := time.Now().Add(500 * time.Millisecond) + ra.UpdateWindow(windowEnd.Add(-ra.Config.Period), windowEnd) + + ra.Push(&acc) + + require.True(t, ra.EndPeriod().Equal(windowEnd.Add(ra.Config.Period)), + "expected periodEnd to advance by one period: want %v, got %v", + windowEnd.Add(ra.Config.Period), ra.EndPeriod()) +} + +func TestRunningAggregatorPushResetsWindowOnLargeForwardJump(t *testing.T) { + // Regression guard for PR #16375: the drift safeguard must still + // reset the window when the wall clock is more than one period + // beyond the expected window, such as after hibernation. + ra := NewRunningAggregator(&mockAggregator{}, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{NamePass: []string{"*"}}, + Period: time.Second, + }) + require.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + + staleEnd := time.Now().Add(-2 * time.Minute) + ra.UpdateWindow(staleEnd.Add(-ra.Config.Period), staleEnd) + + ra.Push(&acc) + + require.Less(t, time.Since(ra.EndPeriod()).Abs(), 2*ra.Config.Period, + "expected safeguard to reset window near current time, got %v", ra.EndPeriod()) +} + type mockAggregator struct { sum int64 }