Skip to content

Commit

Permalink
Add prometheus metrics to the ndt8 handlers (#13)
Browse files Browse the repository at this point in the history
* Add written_files_total counter.

* Rename ClientConnection to ConnectionErrors and add help strings.

* Rename WrittenFiles -> FileWrites

* Add testsTotal metric

* Remove extra (for now) return.

* Merge branch 'main' into add-writtenfiles-metric

* Remove extra output and clarify possible errors coming from the protocol.

* Rename metrics and update definitions of ok/error for ws closures

* Add metric to count failed setCC attempts
  • Loading branch information
robertodauria authored Jan 23, 2024
1 parent 8ce8851 commit 068bb2b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
80 changes: 69 additions & 11 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,39 @@ var validCCAlgorithms = map[string]struct{}{
}

var (
ClientConnections = promauto.NewCounterVec(
websocketUpgrades = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "msak",
Subsystem: "throughput1",
Name: "client_connections_total",
Name: "client_websocket_upgrades_total",
Help: "Number of connections that attempted a websocket upgrade.",
},
[]string{"direction", "status"},
)
testsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "msak",
Subsystem: "throughput1",
Name: "tests_total",
Help: "Number of tests that successfully upgraded to websocket and started",
},
[]string{"direction", "status"},
)
congestionControlErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "msak",
Subsystem: "throughput1",
Name: "congestion_control_errors_total",
Help: "Number of attempts to set congestion control algorithm that resulted in an error.",
},
[]string{"cc"},
)
fileWrites = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "msak",
Subsystem: "throughput1",
Name: "file_writes_total",
Help: "Number of (successful or failed) file writes.",
},
[]string{"direction", "status"},
)
Expand Down Expand Up @@ -72,7 +100,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
req *http.Request) {
mid, err := GetMIDFromRequest(req)
if err != nil {
ClientConnections.WithLabelValues(string(kind), "missing-mid").Inc()
websocketUpgrades.WithLabelValues(string(kind), "missing-mid").Inc()
log.Info("Received request without mid", "source", req.RemoteAddr,
"error", err)
writeBadRequest(rw)
Expand All @@ -84,7 +112,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
query := req.URL.Query()
requestStreams := query.Get("streams")
if requestStreams == "" {
ClientConnections.WithLabelValues(string(kind),
websocketUpgrades.WithLabelValues(string(kind),
"missing-streams").Inc()
log.Info("Received request without streams", "source", req.RemoteAddr)
writeBadRequest(rw)
Expand All @@ -102,7 +130,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
clientOptions = append(clientOptions,
model.NameValue{Name: "duration", Value: requestDuration})
} else {
ClientConnections.WithLabelValues(string(kind),
websocketUpgrades.WithLabelValues(string(kind),
"invalid-duration").Inc()
log.Info("Received request with an invalid duration",
"source", req.RemoteAddr, "duration", requestDuration)
Expand Down Expand Up @@ -135,7 +163,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
var byteLimit int
if requestByteLimit != "" {
if byteLimit, err = strconv.Atoi(requestByteLimit); err != nil {
ClientConnections.WithLabelValues(string(kind), "invalid-byte-limit").Inc()
websocketUpgrades.WithLabelValues(string(kind), "invalid-byte-limit").Inc()
log.Info("Received request with an invalid byte limit", "source", req.RemoteAddr,
"value", requestByteLimit)
writeBadRequest(rw)
Expand All @@ -149,7 +177,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
// option).
metadata, err := getRequestMetadata(req)
if err != nil {
ClientConnections.WithLabelValues(string(kind),
websocketUpgrades.WithLabelValues(string(kind),
"metadata-parse-error").Inc()
log.Info("Error while parsing metadata", "source", req.RemoteAddr,
"error", err)
Expand All @@ -163,7 +191,7 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
// we cannot call writeBadRequest after attempting an Upgrade.
wsConn, err := throughput1.Upgrade(rw, req)
if err != nil {
ClientConnections.WithLabelValues(string(kind),
websocketUpgrades.WithLabelValues(string(kind),
"websocket-upgrade-failed").Inc()
log.Info("Websocket upgrade failed",
"ctx", fmt.Sprintf("%p", req.Context()), "error", err)
Expand All @@ -184,12 +212,17 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
if requestCC != "" {
err = conn.SetCC(requestCC)
if err != nil {
congestionControlErrors.WithLabelValues(requestCC).Inc()
log.Info("Failed to set cc", "ctx", fmt.Sprintf("%p", req.Context()),
"source", wsConn.RemoteAddr(),
"cc", requestCC, "error", err)
}
}

// The WS upgrade succeeded, so update the clientConnections metric.
websocketUpgrades.WithLabelValues(string(kind),
"ok").Inc()

uuid := conn.UUID()
archivalData := model.Throughput1Result{
MeasurementID: mid,
Expand Down Expand Up @@ -225,6 +258,8 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
for {
select {
case <-timeout.Done():
// If the test has timed out count it as a success and return.
testsTotal.WithLabelValues(string(kind), "ok-timeout").Inc()
return
case m := <-senderCh:
// If this is a download test we are the sender, so we can populate
Expand All @@ -243,11 +278,32 @@ func (h *Handler) upgradeAndRunMeasurement(kind model.TestDirection, rw http.Res
archivalData.ClientMeasurements = append(archivalData.ClientMeasurements,
m.Measurement)
case err := <-errCh:
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
// If this is a normal WS closure, it means the client closed the
// connection and the test was successful.
// "Abnormal" closures can happen if the client does not send a
// closure message before terminating the connection on its end.
// These are not counted as errors in the following code.
if websocket.IsCloseError(err, websocket.CloseNormalClosure,
websocket.CloseAbnormalClosure) {
testsTotal.WithLabelValues(string(kind), "ok").Inc()
log.Info("Connection closed normally", "context", fmt.Sprintf("%p", timeout))
return
}

// If this is a WS closure with a code different from CloseNormalClosure
// or CloseAbnormalClosure, count it as a close error.
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure,
websocket.CloseAbnormalClosure) {
log.Info("Connection closed unexpectedly", "context",
fmt.Sprintf("%p", timeout), "error", err)
// TODO: Add Prometheus metric
fmt.Sprintf("%p", timeout), "close-error", err)
testsTotal.WithLabelValues(string(kind), "close-error").Inc()
return
}

// If the error is not a WS close, it means the test did not complete
// successfully.
testsTotal.WithLabelValues(string(kind), "error").Inc()
log.Info("Connection closed with error", "context", fmt.Sprintf("%p", timeout))
return
}
}
Expand All @@ -259,8 +315,10 @@ func (h *Handler) writeResult(uuid string, kind model.TestDirection, result *mod
result)
if err != nil {
log.Error("failed to write throughput1 result", "uuid", uuid, "error", err)
fileWrites.WithLabelValues(string(kind), "error").Inc()
return
}
fileWrites.WithLabelValues(string(kind), "ok").Inc()
}

// GetMIDFromRequest extracts the measurement id ("mid") from a given HTTP
Expand Down
5 changes: 0 additions & 5 deletions pkg/throughput1/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,10 @@ func (p *Protocol) sendWireMeasurement(ctx context.Context, m model.Measurement)
// sending.
jsonwm, err := json.Marshal(wm)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)", ctx, err)
return nil, err
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
return nil, err
}
p.applicationBytesSent.Add(int64(len(jsonwm)))
Expand Down Expand Up @@ -252,7 +250,6 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
size := p.ScaleMessage(spec.MinMessageSize, 0)
message, err := p.makePreparedMessage(size)
if err != nil {
log.Printf("makePreparedMessage failed (ctx: %p)", ctx)
errCh <- err
return
}
Expand All @@ -277,7 +274,6 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
default:
err = p.conn.WritePreparedMessage(message)
if err != nil {
log.Printf("failed to write prepared message (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
Expand Down Expand Up @@ -310,7 +306,6 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
// Create a new message for the new size.
message, err = p.makePreparedMessage(size)
if err != nil {
log.Printf("failed to make prepared message (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
Expand Down

0 comments on commit 068bb2b

Please sign in to comment.