Skip to content

Commit

Permalink
Add metrics and cleanup some code
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Sep 17, 2024
1 parent cf5ea61 commit a129912
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
67 changes: 39 additions & 28 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"io"
"math/rand"
"net/http"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go/ext"

"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -264,11 +264,27 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
preferred, secondary = secondary, preferred
}

var shiftedReq *http.Request
var (
shiftedReq *http.Request
shiftedBody []byte
)
if p.cfg.ShiftComparisonQueriesBy > 0 && rand.Float64() < p.cfg.ShiftComparisonSamplingRatio {
shiftedReq, err = shiftQueryRequest(req, p.cfg.ShiftComparisonQueriesBy)
if err != nil {
// TODO: logs & metrics
shiftedReq = p.shiftQueryRequest(req, p.cfg.ShiftComparisonQueriesBy)
if shiftedReq != nil && shiftedReq.Body != nil {
shiftedBody, err = io.ReadAll(shiftedReq.Body)
if err != nil {
level.Warn(logger).Log("msg", "Unable to read shifted request body", "err", err)
shiftedReq = nil
} else {
if err := shiftedReq.Body.Close(); err != nil {
level.Warn(logger).Log("msg", "Unable to close request body", "err", err)
}

shiftedReq.Body = io.NopCloser(bytes.NewReader(shiftedBody))
if err := shiftedReq.ParseForm(); err != nil {
level.Warn(logger).Log("msg", "Unable to parse form", "err", err)
}
}
}
}

Expand All @@ -277,31 +293,14 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
spawnRequest(preferred, req, body, true)
spawnRequest(secondary, req, body, true)
} else {
wg.Add(1)
wg.Add(3)
spawnRequest(preferred, req, body, false)

var shiftedBody []byte
if shiftedReq.Body != nil {
shiftedBody, err = io.ReadAll(shiftedReq.Body)
if err != nil {
// TODO: dont hard fail. Add logs/metrics
}
if err := shiftedReq.Body.Close(); err != nil {
level.Warn(logger).Log("msg", "Unable to close request body", "err", err)
}

shiftedReq.Body = io.NopCloser(bytes.NewReader(shiftedBody))
if err := shiftedReq.ParseForm(); err != nil {
level.Warn(logger).Log("msg", "Unable to parse form", "err", err)
}
}

// TODO: verify that the user gets the response from the above query
// and the below duplicate query to the preferred backend does not
// change anything user facing.
wg.Add(2)
spawnRequest(preferred, shiftedReq, shiftedBody, true)
spawnRequest(secondary, shiftedReq, shiftedBody, true)

}

// Wait until all backend requests completed.
Expand Down Expand Up @@ -348,9 +347,14 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds())
p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference)
p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc()
if shiftedReq != nil {
p.metrics.shiftedComparisonsTotal.WithLabelValues(p.route.RouteName).Inc()
}
}

func shiftQueryRequest(req *http.Request, d time.Duration) (shiftedRequest *http.Request, err error) {
// shiftQueryRequest shifts the query times of the request for instant and range queries.
// If there was any error, the error is just logged and a nil request is returned.
func (p *ProxyEndpoint) shiftQueryRequest(req *http.Request, d time.Duration) (shiftedRequest *http.Request) {
if !querymiddleware.IsRangeQuery(req.URL.Path) && !querymiddleware.IsInstantQuery(req.URL.Path) {
return
}
Expand All @@ -364,15 +368,22 @@ func shiftQueryRequest(req *http.Request, d time.Duration) (shiftedRequest *http
codec := querymiddleware.NewPrometheusCodec(nil, 5*time.Minute, "protobuf")
decodedRequest, err := codec.DecodeMetricsQueryRequest(req.Context(), req)
if err != nil {
return nil, err
level.Error(p.logger).Log("msg", "Unable to decode request when shifting query", "err", err)
return nil
}
start := decodedRequest.GetStart() - d.Milliseconds()
end := decodedRequest.GetEnd() - d.Milliseconds()
decodedRequest, err = decodedRequest.WithStartEnd(start, end)
if err != nil {
return nil, err
level.Error(p.logger).Log("msg", "Unable to change times when shifting query", "err", err)
return nil
}
shiftedRequest, err = codec.EncodeMetricsQueryRequest(req.Context(), decodedRequest)
if err != nil {
level.Error(p.logger).Log("msg", "Unable to encode request when shifting query", "err", err)
return nil
}
return codec.EncodeMetricsQueryRequest(req.Context(), decodedRequest)
return shiftedRequest
}

func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResponse) *backendResponse {
Expand Down
16 changes: 11 additions & 5 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ const (
type ComparisonResult string

type ProxyMetrics struct {
requestDuration *prometheus.HistogramVec
responsesTotal *prometheus.CounterVec
responsesComparedTotal *prometheus.CounterVec
relativeDuration *prometheus.HistogramVec
proportionalDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
responsesTotal *prometheus.CounterVec
responsesComparedTotal *prometheus.CounterVec
shiftedComparisonsTotal *prometheus.CounterVec
relativeDuration *prometheus.HistogramVec
proportionalDuration *prometheus.HistogramVec
}

func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
Expand All @@ -45,6 +46,11 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
Name: "responses_compared_total",
Help: "Total number of responses compared per route name by result.",
}, []string{"route", "result"}),
shiftedComparisonsTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: queryTeeMetricsNamespace,
Name: "shifted_comparisons_total",
Help: "Total number of responses compared per route name by result.",
}, []string{"route"}),
relativeDuration: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: queryTeeMetricsNamespace,
Name: "backend_response_relative_duration_seconds",
Expand Down

0 comments on commit a129912

Please sign in to comment.