Skip to content

Commit

Permalink
Fix timeline backfills (grafana#2200)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanhuhta authored Aug 3, 2023
1 parent 806f47e commit ac03f8b
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 56 deletions.
67 changes: 30 additions & 37 deletions pkg/querier/timeline/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,57 @@ func New(series *v1.Series, startMs int64, endMs int64, durationDeltaSec int64)
points := series.GetPoints()
res := make([]uint64, len(points))

timeline := &flamebearer.FlamebearerTimelineV1{
StartTime: startSec,
DurationDelta: durationDeltaSec,
Samples: []uint64{},
}

if len(points) < 1 {
return &flamebearer.FlamebearerTimelineV1{
StartTime: startSec,
DurationDelta: durationDeltaSec,
Samples: backfill(startMs, endMs, durationDeltaSec),
if n := sizeToBackfill(startMs, endMs, durationDeltaSec); n > 0 {
timeline.Samples = make([]uint64, n)
}
return timeline
}

i := 0
prev := points[0]
for _, curr := range points {
backfillNum := sizeToBackfill(prev.Timestamp, curr.Timestamp, durationDeltaSec)
res[i] = uint64(curr.Value)

backfillNum := sizeToBackfill(prev.Timestamp, curr.Timestamp, durationDeltaSec)
if backfillNum > 0 {
// backfill + newValue
bf := append(backfill(prev.Timestamp, curr.Timestamp, durationDeltaSec), uint64(curr.Value))

// break the slice
first := res[:i]
second := res[i:]
// Subtract 1 to account for the current value already being added
// to the result slice.
backfillNum--

// add new backfilled items
first = append(first, bf...)
// Insert backfill.
bf := make([]uint64, backfillNum)
res = append(res[:i], append(bf, res[i:]...)...)

// concatenate the three slices to form the new slice
res = append(first, second...)
prev = curr
i = i + int(backfillNum)
} else {
res[i] = uint64(curr.Value)
prev = curr
i = i + 1
backfillNum = 0
}

i += int(backfillNum) + 1
prev = curr
}

// Backfill with 0s for data that's not available
firstAvailableData := points[0]
lastAvailableData := points[len(points)-1]
backFillHead := backfill(startMs, firstAvailableData.Timestamp, durationDeltaSec)
backFillTail := backfill(lastAvailableData.Timestamp, endMs, durationDeltaSec)

res = append(backFillHead, res...)
res = append(res, backFillTail...)
if n := sizeToBackfill(startMs, firstAvailableData.Timestamp, durationDeltaSec); n > 0 {
bf := make([]uint64, n)
res = append(bf, res...)
}

timeline := &flamebearer.FlamebearerTimelineV1{
StartTime: startSec,
DurationDelta: durationDeltaSec,
Samples: res,
if n := sizeToBackfill(lastAvailableData.Timestamp, endMs, durationDeltaSec) - 1; n > 0 {
bf := make([]uint64, n)
res = append(res, bf...)
}

timeline.Samples = res
return timeline
}

Expand All @@ -76,14 +77,6 @@ func New(series *v1.Series, startMs int64, endMs int64, durationDeltaSec int64)
func sizeToBackfill(startMs int64, endMs int64, stepSec int64) int64 {
startSec := startMs / 1000
endSec := endMs / 1000
size := ((endSec - startSec) / stepSec) - 1
size := (endSec - startSec) / stepSec
return size
}

func backfill(startMs int64, endMs int64, stepSec int64) []uint64 {
size := sizeToBackfill(startMs, endMs, stepSec)
if size <= 0 {
size = 0
}
return make([]uint64, size)
}
119 changes: 100 additions & 19 deletions pkg/querier/timeline/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/grafana/pyroscope/pkg/querier/timeline"
)

const TimelineStepSec = 10
const timelineStepSec = 10

func Test_No_Backfill(t *testing.T) {
TestDate := time.Now()
Expand All @@ -20,12 +20,10 @@ func Test_No_Backfill(t *testing.T) {
},
}

timeline := timeline.New(points, TestDate.UnixMilli(), TestDate.UnixMilli(), TimelineStepSec)
timeline := timeline.New(points, TestDate.UnixMilli(), TestDate.UnixMilli(), timelineStepSec)

assert.Equal(t, TestDate.UnixMilli()/1000, timeline.StartTime)
assert.Equal(t, []uint64{
99,
}, timeline.Samples)
assert.Equal(t, []uint64{99}, timeline.Samples)
}

func Test_Backfill_Data_Start_End(t *testing.T) {
Expand All @@ -35,17 +33,38 @@ func Test_Backfill_Data_Start_End(t *testing.T) {

points := &typesv1.Series{
Points: []*typesv1.Point{
// 0 ms -60000 ms
// 10000 ms -50000 ms
// 20000 ms -40000 ms
// 30000 ms -30000 ms
// 40000 ms -20000 ms
// 50000 ms -10000 ms
{Timestamp: TestDate.UnixMilli(), Value: 99},
// 70000 ms +10000 ms
// 80000 ms +20000 ms
// 90000 ms +30000 ms
// 100000 ms +40000 ms
// 110000 ms +50000 ms

},
}

timeline := timeline.New(points, startTime, endTime, TimelineStepSec)
timeline := timeline.New(points, startTime, endTime, timelineStepSec)

assert.Equal(t, startTime/1000, timeline.StartTime)
assert.Equal(t, []uint64{
0, 0, 0, 0, 0,
99,
0, 0, 0, 0, 0,
0, // 0 ms -60000 ms
0, // 10000 ms -50000 ms
0, // 20000 ms -40000 ms
0, // 30000 ms -30000 ms
0, // 40000 ms -20000 ms
0, // 50000 ms -10000 ms
99, // 60000 ms 0 ms (now)
0, // 70000 ms +10000 ms
0, // 80000 ms +20000 ms
0, // 90000 ms +30000 ms
0, // 100000 ms +40000 ms
0, // 110000 ms +50000 ms
}, timeline.Samples)
}

Expand All @@ -56,20 +75,37 @@ func Test_Backfill_Data_Middle(t *testing.T) {

points := &typesv1.Series{
Points: []*typesv1.Point{
// 0 ms -60000 ms
// 10000 ms -50000 ms
// 20000 ms -40000 ms
// 30000 ms -30000 ms
// 40000 ms -20000 ms
// 50000 ms -10000 ms
{Timestamp: TestDate.UnixMilli(), Value: 99},
// 70000 ms +10000 ms
{Timestamp: TestDate.Add(20 * time.Second).UnixMilli(), Value: 98},
// 90000 ms +30000 ms
// 100000 ms +40000 ms
// 110000 ms +50000 ms
},
}

timeline := timeline.New(points, startTime, endTime, TimelineStepSec)
timeline := timeline.New(points, startTime, endTime, timelineStepSec)

assert.Equal(t, startTime/1000, timeline.StartTime)
assert.Equal(t, []uint64{
0, 0, 0, 0, 0,
99,
0,
98,
0, 0, 0, 0,
0, // 0 ms -60000 ms
0, // 10000 ms -50000 ms
0, // 20000 ms -40000 ms
0, // 30000 ms -30000 ms
0, // 40000 ms -20000 ms
0, // 50000 ms -10000 ms
99, // 60000 ms 0 ms (now)
0, // 70000 ms +10000 ms
98, // 80000 ms +20000 ms
0, // 90000 ms +30000 ms
0, // 100000 ms +40000 ms
0, // 110000 ms +50000 ms
}, timeline.Samples)
}

Expand All @@ -82,12 +118,57 @@ func Test_Backfill_All(t *testing.T) {
Points: []*typesv1.Point{},
}

timeline := timeline.New(points, startTime, endTime, TimelineStepSec)
timeline := timeline.New(points, startTime, endTime, timelineStepSec)

assert.Equal(t, startTime/1000, timeline.StartTime)
assert.Equal(t, []uint64{
0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
0,
0, // 0 ms -60000 ms
0, // 10000 ms -50000 ms
0, // 20000 ms -40000 ms
0, // 30000 ms -30000 ms
0, // 40000 ms -20000 ms
0, // 50000 ms -10000 ms
0, // 60000 ms 0 ms (now)
0, // 70000 ms +10000 ms
0, // 80000 ms +20000 ms
0, // 90000 ms +30000 ms
0, // 100000 ms +40000 ms
0, // 110000 ms +50000 ms
}, timeline.Samples)
}

func Test_Backfill_Arbitrary(t *testing.T) {
startMs := int64(0)
endMs := int64(10 * time.Second / time.Millisecond)
step := int64(1)
series := &typesv1.Series{
Points: []*typesv1.Point{
// 0 ms
// 1000 ms
{Timestamp: 2000, Value: 69},
{Timestamp: 3000, Value: 83},
// 4000 ms
// 5000 ms
{Timestamp: 6000, Value: 85},
// 7000 ms
{Timestamp: 8000, Value: 91},
// 9000 ms
},
}

tl := timeline.New(series, startMs, endMs, step)
assert.Equal(t, startMs/1000, tl.StartTime)

assert.Equal(t, []uint64{
0, // 0 ms
0, // 1000 ms
69, // 2000 ms
83, // 3000 ms
0, // 4000 ms
0, // 5000 ms
85, // 6000 ms
0, // 7000 ms
91, // 8000 ms
0, // 9000 ms
}, tl.Samples)
}

0 comments on commit ac03f8b

Please sign in to comment.