Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 51 additions & 51 deletions internal/grpc/get_master_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,12 +1063,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_SENT_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Sent != nil {
left = p.TotalMetrics.Instrumentation.Sent.TotalBytes
left = max(uint64(p.TotalMetrics.Instrumentation.Sent.TotalBytes), p.TotalMetrics.Instrumentation.Sent.TotalBytesLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Sent != nil {
right = q.TotalMetrics.Instrumentation.Sent.TotalBytes
right = max(uint64(q.TotalMetrics.Instrumentation.Sent.TotalBytes), q.TotalMetrics.Instrumentation.Sent.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1078,12 +1078,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_SENT_TUPLE_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Sent != nil {
left = p.TotalMetrics.Instrumentation.Sent.TupleBytes
left = max(uint64(p.TotalMetrics.Instrumentation.Sent.TupleBytes), p.TotalMetrics.Instrumentation.Sent.TupleBytesLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Sent != nil {
right = q.TotalMetrics.Instrumentation.Sent.TupleBytes
right = max(uint64(q.TotalMetrics.Instrumentation.Sent.TupleBytes), q.TotalMetrics.Instrumentation.Sent.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1093,12 +1093,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_SENT_CHUNKS:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Sent != nil {
left = p.TotalMetrics.Instrumentation.Sent.Chunks
left = max(uint64(p.TotalMetrics.Instrumentation.Sent.Chunks), p.TotalMetrics.Instrumentation.Sent.ChunksLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Sent != nil {
right = q.TotalMetrics.Instrumentation.Sent.Chunks
right = max(uint64(q.TotalMetrics.Instrumentation.Sent.Chunks), q.TotalMetrics.Instrumentation.Sent.ChunksLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1108,12 +1108,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_RECV_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Received != nil {
left = p.TotalMetrics.Instrumentation.Received.TotalBytes
left = max(uint64(p.TotalMetrics.Instrumentation.Received.TotalBytes), p.TotalMetrics.Instrumentation.Received.TotalBytesLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Received != nil {
right = q.TotalMetrics.Instrumentation.Received.TotalBytes
right = max(uint64(q.TotalMetrics.Instrumentation.Received.TotalBytes), q.TotalMetrics.Instrumentation.Received.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1123,12 +1123,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_RECV_TUPLE_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Received != nil {
left = p.TotalMetrics.Instrumentation.Received.TupleBytes
left = max(uint64(p.TotalMetrics.Instrumentation.Received.TupleBytes), p.TotalMetrics.Instrumentation.Received.TupleBytesLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Received != nil {
right = q.TotalMetrics.Instrumentation.Received.TupleBytes
right = max(uint64(q.TotalMetrics.Instrumentation.Received.TupleBytes), q.TotalMetrics.Instrumentation.Received.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1138,12 +1138,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_TOTAL_NET_RECV_CHUNKS:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.TotalMetrics != nil && p.TotalMetrics.Instrumentation != nil && p.TotalMetrics.Instrumentation.Received != nil {
left = p.TotalMetrics.Instrumentation.Received.Chunks
left = max(uint64(p.TotalMetrics.Instrumentation.Received.Chunks), p.TotalMetrics.Instrumentation.Received.ChunksLong)
}
if q.TotalMetrics != nil && q.TotalMetrics.Instrumentation != nil && q.TotalMetrics.Instrumentation.Received != nil {
right = q.TotalMetrics.Instrumentation.Received.Chunks
right = max(uint64(q.TotalMetrics.Instrumentation.Received.Chunks), q.TotalMetrics.Instrumentation.Received.ChunksLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand Down Expand Up @@ -1648,12 +1648,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_SENT_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Sent != nil {
left = p.LastMetrics.Instrumentation.Sent.TotalBytes
left = max(uint64(p.LastMetrics.Instrumentation.Sent.TotalBytes), p.LastMetrics.Instrumentation.Sent.TotalBytesLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Sent != nil {
right = q.LastMetrics.Instrumentation.Sent.TotalBytes
right = max(uint64(q.LastMetrics.Instrumentation.Sent.TotalBytes), q.LastMetrics.Instrumentation.Sent.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1663,12 +1663,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_SENT_TUPLE_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Sent != nil {
left = p.LastMetrics.Instrumentation.Sent.TupleBytes
left = max(uint64(p.LastMetrics.Instrumentation.Sent.TupleBytes), p.LastMetrics.Instrumentation.Sent.TupleBytesLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Sent != nil {
right = q.LastMetrics.Instrumentation.Sent.TupleBytes
right = max(uint64(q.LastMetrics.Instrumentation.Sent.TupleBytes), q.LastMetrics.Instrumentation.Sent.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1678,12 +1678,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_SENT_CHUNKS:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Sent != nil {
left = p.LastMetrics.Instrumentation.Sent.Chunks
left = max(uint64(p.LastMetrics.Instrumentation.Sent.Chunks), p.LastMetrics.Instrumentation.Sent.ChunksLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Sent != nil {
right = q.LastMetrics.Instrumentation.Sent.Chunks
right = max(uint64(q.LastMetrics.Instrumentation.Sent.Chunks), q.LastMetrics.Instrumentation.Sent.ChunksLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1693,12 +1693,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_RECV_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Received != nil {
left = p.LastMetrics.Instrumentation.Received.TotalBytes
left = max(uint64(p.LastMetrics.Instrumentation.Received.TotalBytes), p.LastMetrics.Instrumentation.Received.TotalBytesLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Received != nil {
right = q.LastMetrics.Instrumentation.Received.TotalBytes
right = max(uint64(q.LastMetrics.Instrumentation.Received.TotalBytes), q.LastMetrics.Instrumentation.Received.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1708,12 +1708,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_RECV_TUPLE_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Received != nil {
left = p.LastMetrics.Instrumentation.Received.TupleBytes
left = max(uint64(p.LastMetrics.Instrumentation.Received.TupleBytes), p.LastMetrics.Instrumentation.Received.TupleBytesLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Received != nil {
right = q.LastMetrics.Instrumentation.Received.TupleBytes
right = max(uint64(q.LastMetrics.Instrumentation.Received.TupleBytes), q.LastMetrics.Instrumentation.Received.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -1723,12 +1723,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_LAST_NET_RECV_CHUNKS:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.LastMetrics != nil && p.LastMetrics.Instrumentation != nil && p.LastMetrics.Instrumentation.Received != nil {
left = p.LastMetrics.Instrumentation.Received.Chunks
left = max(uint64(p.LastMetrics.Instrumentation.Received.Chunks), p.LastMetrics.Instrumentation.Received.ChunksLong)
}
if q.LastMetrics != nil && q.LastMetrics.Instrumentation != nil && q.LastMetrics.Instrumentation.Received != nil {
right = q.LastMetrics.Instrumentation.Received.Chunks
right = max(uint64(q.LastMetrics.Instrumentation.Received.Chunks), q.LastMetrics.Instrumentation.Received.ChunksLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand Down Expand Up @@ -2233,12 +2233,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_QUERY_NET_SENT_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.QueryMetrics != nil && p.QueryMetrics.Instrumentation != nil && p.QueryMetrics.Instrumentation.Sent != nil {
left = p.QueryMetrics.Instrumentation.Sent.TotalBytes
left = max(uint64(p.QueryMetrics.Instrumentation.Sent.TotalBytes), p.QueryMetrics.Instrumentation.Sent.TotalBytesLong)
}
if q.QueryMetrics != nil && q.QueryMetrics.Instrumentation != nil && q.QueryMetrics.Instrumentation.Sent != nil {
right = q.QueryMetrics.Instrumentation.Sent.TotalBytes
right = max(uint64(q.QueryMetrics.Instrumentation.Sent.TotalBytes), q.QueryMetrics.Instrumentation.Sent.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -2248,12 +2248,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_QUERY_NET_SENT_TUPLE_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.QueryMetrics != nil && p.QueryMetrics.Instrumentation != nil && p.QueryMetrics.Instrumentation.Sent != nil {
left = p.QueryMetrics.Instrumentation.Sent.TupleBytes
left = max(uint64(p.QueryMetrics.Instrumentation.Sent.TupleBytes), p.QueryMetrics.Instrumentation.Sent.TupleBytesLong)
}
if q.QueryMetrics != nil && q.QueryMetrics.Instrumentation != nil && q.QueryMetrics.Instrumentation.Sent != nil {
right = q.QueryMetrics.Instrumentation.Sent.TupleBytes
right = max(uint64(q.QueryMetrics.Instrumentation.Sent.TupleBytes), q.QueryMetrics.Instrumentation.Sent.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -2263,12 +2263,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_QUERY_NET_SENT_CHUNKS:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.QueryMetrics != nil && p.QueryMetrics.Instrumentation != nil && p.QueryMetrics.Instrumentation.Sent != nil {
left = p.QueryMetrics.Instrumentation.Sent.Chunks
left = max(uint64(p.QueryMetrics.Instrumentation.Sent.Chunks), p.QueryMetrics.Instrumentation.Sent.ChunksLong)
}
if q.QueryMetrics != nil && q.QueryMetrics.Instrumentation != nil && q.QueryMetrics.Instrumentation.Sent != nil {
right = q.QueryMetrics.Instrumentation.Sent.Chunks
right = max(uint64(q.QueryMetrics.Instrumentation.Sent.Chunks), q.QueryMetrics.Instrumentation.Sent.ChunksLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -2278,12 +2278,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_QUERY_NET_RECV_TOTAL_BYTES:

left, right := uint32(0.0), uint32(0.0)
left, right := uint64(0), uint64(0)
if p.QueryMetrics != nil && p.QueryMetrics.Instrumentation != nil && p.QueryMetrics.Instrumentation.Received != nil {
left = p.QueryMetrics.Instrumentation.Received.TotalBytes
left = max(uint64(p.QueryMetrics.Instrumentation.Received.TotalBytes), p.QueryMetrics.Instrumentation.Received.TotalBytesLong)
}
if q.QueryMetrics != nil && q.QueryMetrics.Instrumentation != nil && q.QueryMetrics.Instrumentation.Received != nil {
right = q.QueryMetrics.Instrumentation.Received.TotalBytes
right = max(uint64(q.QueryMetrics.Instrumentation.Received.TotalBytes), q.QueryMetrics.Instrumentation.Received.TotalBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand All @@ -2293,12 +2293,12 @@ func (ms *MultipleSorter) Less(i, j int) bool {
}
case pbm.SessionField_QUERY_NET_RECV_TUPLE_BYTES:

left, right := uint32(0), uint32(0)
left, right := uint64(0), uint64(0)
if p.QueryMetrics != nil && p.QueryMetrics.Instrumentation != nil && p.QueryMetrics.Instrumentation.Received != nil {
left = p.QueryMetrics.Instrumentation.Received.TupleBytes
left = max(uint64(p.QueryMetrics.Instrumentation.Received.TupleBytes), p.QueryMetrics.Instrumentation.Received.TupleBytesLong)
}
if q.QueryMetrics != nil && q.QueryMetrics.Instrumentation != nil && q.QueryMetrics.Instrumentation.Received != nil {
right = q.QueryMetrics.Instrumentation.Received.TupleBytes
right = max(uint64(q.QueryMetrics.Instrumentation.Received.TupleBytes), q.QueryMetrics.Instrumentation.Received.TupleBytesLong)
}
if less(ms.Fields[k].Order, left, right) {
return true
Expand Down
68 changes: 68 additions & 0 deletions internal/grpc/get_master_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,74 @@ func TestSortAndFilterSessions(t *testing.T) {
})
}

func TestSortNetworkStatLongFields(t *testing.T) {
type F = pbm.SessionField
for _, tc := range []struct {
f F
slot string
sent bool
kind string
}{
{F(pbm.SessionField_TOTAL_NET_SENT_TOTAL_BYTES), "total", true, "bytes"},
{F(pbm.SessionField_TOTAL_NET_SENT_TUPLE_BYTES), "total", true, "tuple"},
{F(pbm.SessionField_TOTAL_NET_SENT_CHUNKS), "total", true, "chunks"},
{F(pbm.SessionField_TOTAL_NET_RECV_TOTAL_BYTES), "total", false, "bytes"},
{F(pbm.SessionField_TOTAL_NET_RECV_TUPLE_BYTES), "total", false, "tuple"},
{F(pbm.SessionField_TOTAL_NET_RECV_CHUNKS), "total", false, "chunks"},
{F(pbm.SessionField_LAST_NET_SENT_TOTAL_BYTES), "last", true, "bytes"},
{F(pbm.SessionField_LAST_NET_SENT_TUPLE_BYTES), "last", true, "tuple"},
{F(pbm.SessionField_LAST_NET_SENT_CHUNKS), "last", true, "chunks"},
{F(pbm.SessionField_LAST_NET_RECV_TOTAL_BYTES), "last", false, "bytes"},
{F(pbm.SessionField_LAST_NET_RECV_TUPLE_BYTES), "last", false, "tuple"},
{F(pbm.SessionField_LAST_NET_RECV_CHUNKS), "last", false, "chunks"},
{F(pbm.SessionField_QUERY_NET_SENT_TOTAL_BYTES), "query", true, "bytes"},
{F(pbm.SessionField_QUERY_NET_SENT_TUPLE_BYTES), "query", true, "tuple"},
{F(pbm.SessionField_QUERY_NET_SENT_CHUNKS), "query", true, "chunks"},
{F(pbm.SessionField_QUERY_NET_RECV_TOTAL_BYTES), "query", false, "bytes"},
{F(pbm.SessionField_QUERY_NET_RECV_TUPLE_BYTES), "query", false, "tuple"},
} {
t.Run(tc.f.String(), func(t *testing.T) {
stat := func(u32 uint32, u64 uint64) *pbc.NetworkStat {
switch tc.kind {
case "tuple":
return &pbc.NetworkStat{TupleBytes: u32, TupleBytesLong: u64}
case "chunks":
return &pbc.NetworkStat{Chunks: u32, ChunksLong: u64}
default:
return &pbc.NetworkStat{TotalBytes: u32, TotalBytesLong: u64}
}
}
stat1, stat2 := stat(5, 5_000_000_000), stat(10, 3_000_000_000)

mk := func(id int64, s *pbc.NetworkStat) *pbc.SessionState {
instr := &pbc.MetricInstrumentation{}
if tc.sent {
instr.Sent = s
} else {
instr.Received = s
}
st := &pbc.SessionState{SessionKey: &pbc.SessionKey{SessId: id}}
m := &pbc.GPMetrics{Instrumentation: instr}
switch tc.slot {
case "total":
st.TotalMetrics = m
case "last":
st.LastMetrics = m
case "query":
st.QueryMetrics = m
}
return st
}

sessions := []*pbc.SessionState{mk(1, stat1), mk(2, stat2)}
fields := []*pbm.SessionFieldWrapper{{FieldName: tc.f, Order: pbm.SortOrder_SORT_DESC}}
require.NoError(t, grpc.SortResult(&sessions, fields))
assert.Equal(t, int64(1), sessions[0].SessionKey.SessId)
assert.Equal(t, int64(2), sessions[1].SessionKey.SessId)
})
}
}

func TestMasterMethods(t *testing.T) {
ctrl := gomock.NewController(t)
sessionMocker := NewMockStatActivityLister(ctrl)
Expand Down
Loading