Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out-of-order Flag : Added option for sending out of order samples #68

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
outOfOrder = kingpin.Flag("out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

One idea, instead of a bool should we have a flag like out-of-order.min-time which would control the minimum time this can go back in time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to do it in another PR though, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okkay! i will raise another PR for that. thank you !

)

func main() {
Expand Down Expand Up @@ -115,6 +116,7 @@ func main() {
InsecureSkipVerify: *tlsClientInsecure,
},
TenantHeader: *remoteTenantHeader,
OutOfOrder: *outOfOrder,
}

// Collect Pprof during the write only if not collecting within a regular interval.
Expand All @@ -128,7 +130,7 @@ func main() {
)
if *remotePprofInterval > 0 {
if len(*remotePprofURLs) == 0 {
log.Fatal("remote profiling interval specified wihout any remote pprof urls")
log.Fatal("remote profiling interval specified without any remote pprof urls")
}
suffix := rand.Intn(1000)
go func() {
Expand Down Expand Up @@ -162,7 +164,7 @@ func main() {
return
}

fmt.Printf("Serving ur metrics at localhost:%v/metrics\n", *port)
fmt.Printf("Serving your metrics at localhost:%v/metrics\n", *port)
err = metrics.ServeMetrics(*port)
if err != nil {
log.Fatal(err)
Expand Down
23 changes: 19 additions & 4 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConfigWrite struct {
Tenant string
TLSClientConfig tls.Config
TenantHeader string
OutOfOrder bool
}

// Client for the remote write requests.
Expand Down Expand Up @@ -108,7 +109,7 @@ func cloneRequest(r *http.Request) *http.Request {
}

func (c *Client) write() error {
tss, err := collectMetrics()
tss, err := collectMetrics(c.config.OutOfOrder)
if err != nil {
return err
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *Client) write() error {
select {
case <-c.config.UpdateNotify:
log.Println("updating remote write metrics")
tss, err = collectMetrics()
tss, err = collectMetrics(c.config.OutOfOrder)
if err != nil {
merr.Add(err)
}
Expand Down Expand Up @@ -193,14 +194,28 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries {
return tss
}

func collectMetrics() ([]prompb.TimeSeries, error) {
func collectMetrics(outOfOrder bool) ([]prompb.TimeSeries, error) {
metricsMux.Lock()
defer metricsMux.Unlock()
metricFamilies, err := promRegistry.Gather()
if err != nil {
return nil, err
}
return ToTimeSeriesSlice(metricFamilies), nil
tss := ToTimeSeriesSlice(metricFamilies)
if outOfOrder {
tss = shuffleTimestamps(tss)
}
return tss, nil
}
func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries {
now := time.Now().UnixMilli()
offsets := []int64{0, -60 * 1000, -5 * 60 * 1000}

for i := range tss {
offset := offsets[i%len(offsets)]
tss[i].Samples[0].Timestamp = now + offset
}
return tss
}

// ToTimeSeriesSlice converts a slice of metricFamilies containing samples into a slice of TimeSeries
Expand Down
60 changes: 60 additions & 0 deletions metrics/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
)

func TestShuffleTimestamps(t *testing.T) {
now := time.Now().UnixMilli()

tss := []prompb.TimeSeries{
{Samples: []prompb.Sample{{Timestamp: now}}},
{Samples: []prompb.Sample{{Timestamp: now}}},
{Samples: []prompb.Sample{{Timestamp: now}}},
}

shuffledTSS := shuffleTimestamps(tss)

offsets := []int64{0, -60 * 1000, -5 * 60 * 1000}
for _, ts := range shuffledTSS {
timestampValid := false
for _, offset := range offsets {
expectedTimestamp := now + offset
if ts.Samples[0].Timestamp == expectedTimestamp {
timestampValid = true
break
}
}
if !timestampValid {
t.Errorf("Timestamp %v is not in the expected offsets: %v", ts.Samples[0].Timestamp, offsets)
}
}

outOfOrder := false
for i := 1; i < len(shuffledTSS); i++ {
if shuffledTSS[i].Samples[0].Timestamp < shuffledTSS[i-1].Samples[0].Timestamp {
outOfOrder = true
break
}
}

if !outOfOrder {
t.Error("Timestamps are not out of order")
}
}