Skip to content

Commit

Permalink
Add summary at the end of the measurement and fix RTT calc (#45)
Browse files Browse the repository at this point in the history
* Add rtt/minrtt

* Add summary at the end of the measurement

* Fix output
  • Loading branch information
robertodauria authored Feb 7, 2024
1 parent 068bb2b commit c3544e4
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 36 deletions.
2 changes: 2 additions & 0 deletions cmd/msak-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ func main() {
if *flagUpload {
cl.Upload(context.Background())
}

cl.PrintSummary()
}
105 changes: 75 additions & 30 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
DefaultWebSocketHandshakeTimeout = 5 * time.Second

// DefaultStreams is the default number of streams for a new client.
DefaultStreams = 3
DefaultStreams = 2

// DefaultLength is the default test duration for a new client.
DefaultLength = 5 * time.Second
Expand Down Expand Up @@ -92,10 +92,23 @@ type Throughput1Client struct {
// It is set when the first streams connects to the server and used to compute the elapsed time.
sharedStartTime time.Time
started atomic.Bool

// rtt is the latest RTT value from TCPInfo.
rtt atomic.Uint32

// minRTT is the lowest RTT value observed across all streams.
minRTT atomic.Uint32

// lastResultForSubtest contains the last recorded measurement for the
// corresponding subtest (download/upload).
lastResultForSubtest map[spec.SubtestKind]Result
lastResultForSubtestMutex sync.Mutex
}

// Result contains the aggregate metrics collected during the test.
type Result struct {
// Subtest is the subtest this Result refers to.
Subtest spec.SubtestKind
// Goodput is the average number of application-level bits per second that
// have been transferred so far across all the streams.
Goodput float64
Expand All @@ -104,8 +117,18 @@ type Result struct {
Throughput float64
// Elapsed is the total time elapsed since the test started.
Elapsed time.Duration
// MinRTT is the minimum of MinRTT values observed across all the streams.
// RTT is the latest RTT value from TCPInfo from any stream.
RTT uint32
// MinRTT is the minimum of RTT values observed across all the streams.
MinRTT uint32
// Streams is the number of streams used in the test.
Streams int
// ByteLimit is the byte limit used in the test.
ByteLimit int
// Length is the length of the test.
Length time.Duration
// CongestionControl is the congestion control used in the test.
CongestionControl string
}

// makeUserAgent creates the user agent string.
Expand All @@ -131,6 +154,8 @@ func New(clientName, clientVersion string, config Config) *Throughput1Client {

tIndex: map[string]int{},
recvByteCounters: map[int][]int64{},

lastResultForSubtest: map[spec.SubtestKind]Result{},
}
}

Expand Down Expand Up @@ -214,6 +239,7 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)

// Reset the counters.
c.recvByteCounters = map[int][]int64{}
c.rtt.Store(0)

startTimeCh := make(chan time.Time, 1)

Expand Down Expand Up @@ -253,7 +279,6 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
}

wg.Wait()

return nil
}

Expand Down Expand Up @@ -303,40 +328,39 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
clientCh, serverCh, errCh = proto.SenderLoop(ctx)
}

var m model.WireMeasurement

for {
select {
case <-ctx.Done():
c.config.Emitter.OnComplete(streamID, mURL.Host)
c.config.Emitter.OnStreamComplete(streamID, mURL.Host)
return nil
case m := <-clientCh:
case m = <-clientCh:
// If subtest is download, store the client-side measurement.
if subtest != spec.SubtestDownload {
continue
}
c.config.Emitter.OnMeasurement(streamID, m)
c.config.Emitter.OnDebug(fmt.Sprintf("Stream #%d - application r/w: %d/%d, network r/w: %d/%d",
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
c.emitResult(c.sharedStartTime)
}
case m := <-serverCh:
case m = <-serverCh:
// If subtest is upload, store the server-side measurement.
if subtest != spec.SubtestUpload {
continue
}
c.config.Emitter.OnMeasurement(streamID, m)
c.config.Emitter.OnDebug(fmt.Sprintf("#%d - application r/w: %d/%d, network r/w: %d/%d",
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
c.emitResult(c.sharedStartTime)
}
case err := <-errCh:
return err
}

c.config.Emitter.OnMeasurement(streamID, m)
c.config.Emitter.OnDebug(fmt.Sprintf("Stream #%d - application r/w: %d/%d, network r/w: %d/%d",
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
res := c.computeResult(subtest)
c.config.Emitter.OnResult(res)
c.lastResultForSubtestMutex.Lock()
c.lastResultForSubtest[subtest] = res
c.lastResultForSubtestMutex.Unlock()
}
}
}

Expand All @@ -345,6 +369,16 @@ func (c *Throughput1Client) storeMeasurement(streamID int, m model.WireMeasureme
c.recvByteCountersMutex.Lock()
c.recvByteCounters[streamID] = append(c.recvByteCounters[streamID], m.Application.BytesReceived)
c.recvByteCountersMutex.Unlock()

if m.TCPInfo != nil {
if m.TCPInfo.RTT > 0 {
c.rtt.Store(m.TCPInfo.RTT)
}
minRTT := c.minRTT.Load()
if m.TCPInfo.MinRTT > 0 && (minRTT == 0 || m.TCPInfo.MinRTT < minRTT) {
c.minRTT.Store(m.TCPInfo.MinRTT)
}
}
}

// applicationBytes returns the aggregate application-level bytes transferred by all the streams.
Expand All @@ -360,17 +394,23 @@ func (c *Throughput1Client) applicationBytes() int64 {
return sum
}

// emitResult emits the result of the current measurement via the configured Emitter.
func (c *Throughput1Client) emitResult(start time.Time) {
// computeResult returns a Result struct with the current state of the measurement.
func (c *Throughput1Client) computeResult(subtest spec.SubtestKind) Result {
applicationBytes := c.applicationBytes()
elapsed := time.Since(start)
elapsed := time.Since(c.sharedStartTime)
goodput := float64(applicationBytes) / float64(elapsed.Seconds()) * 8 // bps
result := Result{
Elapsed: elapsed,
Goodput: goodput,
Throughput: 0, // TODO
return Result{
Subtest: subtest,
Elapsed: elapsed,
Goodput: goodput,
Throughput: 0, // TODO,
MinRTT: c.minRTT.Load(),
RTT: c.rtt.Load(),
Streams: c.config.NumStreams,
ByteLimit: c.config.ByteLimit,
Length: c.config.Length,
CongestionControl: c.config.CongestionControl,
}
c.config.Emitter.OnResult(result)
}

// Download runs a download test using the settings configured for this client.
Expand All @@ -389,6 +429,11 @@ func (c *Throughput1Client) Upload(ctx context.Context) {
}
}

// PrintSummary emits a summary via the configured emitter
func (c *Throughput1Client) PrintSummary() {
c.config.Emitter.OnSummary(c.lastResultForSubtest)
}

func getPathForSubtest(subtest spec.SubtestKind) string {
switch subtest {
case spec.SubtestDownload:
Expand Down
25 changes: 19 additions & 6 deletions pkg/client/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type Emitter interface {
OnResult(Result)
// OnError is called on errors.
OnError(err error)
// OnComplete is called after a stream completes.
OnComplete(streamID int, server string)
// OnStreamComplete is called after a stream completes.
OnStreamComplete(streamID int, server string)
// OnDebug is called to print debug information.
OnDebug(msg string)
// OnSummary is called to print summary information.
OnSummary(results map[spec.SubtestKind]Result)
}

// HumanReadable prints human-readable output to stdout.
Expand All @@ -34,8 +36,8 @@ type HumanReadable struct {

// OnResult prints the aggregate result.
func (HumanReadable) OnResult(r Result) {
fmt.Printf("Elapsed: %.2fs, Goodput: %f Mb/s, MinRTT: %d\n", r.Elapsed.Seconds(),
r.Goodput/1e6, r.MinRTT)
fmt.Printf("%s rate: %f Mb/s, rtt: %.2f, minrtt: %.2f\n",
r.Subtest, r.Goodput/1e6, float32(r.RTT)/1000, float32(r.MinRTT)/1000)
}

// OnStart is called when the stream starts and prints the subtest and server hostname.
Expand All @@ -60,11 +62,22 @@ func (HumanReadable) OnError(err error) {
}
}

// OnComplete is called after a stream completes.
func (HumanReadable) OnComplete(streamID int, server string) {
// OnStreamComplete is called after a stream completes.
func (HumanReadable) OnStreamComplete(streamID int, server string) {
fmt.Printf("Stream %d complete (server %s)\n", streamID, server)
}

func (HumanReadable) OnSummary(results map[spec.SubtestKind]Result) {
fmt.Println()
fmt.Printf("Test results:\n")
for kind, result := range results {
fmt.Printf(" %s rate: %.2f Mb/s, rtt: %.2fms, minrtt: %.2fms\n",
kind, result.Goodput/1e6, float32(result.RTT)/1000, float32(result.MinRTT)/1000)
fmt.Printf(" streams: %d, duration: %.2fs, cc algo: %s, byte limit: %d bytes\n",
result.Streams, result.Length.Seconds(), result.CongestionControl, result.ByteLimit)
}
}

// OnDebug is called to print debug information.
func (e HumanReadable) OnDebug(msg string) {
if e.Debug {
Expand Down

0 comments on commit c3544e4

Please sign in to comment.