Skip to content

Commit

Permalink
extending lt counter metric (#250)
Browse files Browse the repository at this point in the history
* extending loadtests counter metric
* adding testcases for CountExistingLoadtests
  • Loading branch information
s-radyuk authored Dec 21, 2022
1 parent 5c98c24 commit bad91bb
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 57 deletions.
49 changes: 24 additions & 25 deletions pkg/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,6 @@ func (c *Client) filterLoadTestsByPhase(list *apisLoadTestV1.LoadTestList, phase
return &filteredList
}

// CountActiveLoadTests returns a number of currently running load tests
func (c *Client) CountActiveLoadTests(ctx context.Context) (int, error) {
loadTests, err := c.ltClient.List(ctx, metaV1.ListOptions{})
if err != nil {
return 0, err
}
counter := 0

// CRD-s currently don't support custom field selectors, so we have to iterate via all load tests and check status phase
for _, loadTest := range loadTests.Items {
if loadTest.Status.Phase == apisLoadTestV1.LoadTestRunning || loadTest.Status.Phase == apisLoadTestV1.LoadTestCreating {
counter++
}
}
return counter, nil
}

// GetMasterPodRequest is making an assumptions that we only care about the logs
// from the most recently created pod. It gets the pods associated with
// the master job and returns the request that is used for getting the logs
Expand Down Expand Up @@ -248,20 +231,36 @@ func BuildClientConfig(masterURL string, kubeConfigPath string, timeout time.Dur
return kubeCfg, nil
}

// CountRunningLoadtests used in metrics to report running loadtests
func (c *Client) CountRunningLoadtests() int64 {
// CountExistingLoadtests used in metrics to report running loadtests
func (c *Client) CountExistingLoadtests() (map[apisLoadTestV1.LoadTestPhase]int64, map[apisLoadTestV1.LoadTestType]int64, error) {
tt, err := c.ltClient.List(context.Background(), metaV1.ListOptions{})
if err != nil {
c.logger.Error("Couldn't list existing loadtests", zap.Error(err))
return 0
return nil, nil, err
}

var phaseCount = map[apisLoadTestV1.LoadTestPhase]int64{
apisLoadTestV1.LoadTestRunning: 0,
apisLoadTestV1.LoadTestFinished: 0,
apisLoadTestV1.LoadTestCreating: 0,
apisLoadTestV1.LoadTestErrored: 0,
apisLoadTestV1.LoadTestStarting: 0,
}

var typeCount = map[apisLoadTestV1.LoadTestType]int64{
apisLoadTestV1.LoadTestTypeK6: 0,
apisLoadTestV1.LoadTestTypeJMeter: 0,
apisLoadTestV1.LoadTestTypeLocust: 0,
apisLoadTestV1.LoadTestTypeGhz: 0,
}

var rt = 0
for _, loadTest := range tt.Items {
if loadTest.Status.Phase == apisLoadTestV1.LoadTestRunning {
rt++
}
phaseString := loadTest.Status.Phase
phaseCount[phaseString]++

typeString := loadTest.Spec.Type
typeCount[typeString]++
}

return int64(rt)
return phaseCount, typeCount, nil
}
152 changes: 127 additions & 25 deletions pkg/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,39 +354,141 @@ func TestClient_filterLoadTestsByPhase(t *testing.T) {
}
}

func TestCountActiveLoadTests(t *testing.T) {
ctx := context.Background()

loadtestClientset := fakeClientset.NewSimpleClientset()
kubeClientSet := fake.NewSimpleClientset()

loadtestClientset.Fake.PrependReactor("list", "loadtests", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &apisLoadTestV1.LoadTestList{
Items: []apisLoadTestV1.LoadTest{
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestRunning,
func TestCountExistingLoadTests(t *testing.T) {
testCases := []struct {
scenario string
opt ListOptions
result *apisLoadTestV1.LoadTestList
error error
expectedResult int
expectedError string
expectedTypes int64
}{
{
scenario: "error in client",
result: &apisLoadTestV1.LoadTestList{},
error: errors.New("client error"),
expectedError: "client error",
expectedResult: 0,
expectedTypes: 0,
},
{
scenario: "unknown phase",
result: &apisLoadTestV1.LoadTestList{
Items: []apisLoadTestV1.LoadTest{
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: "",
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: apisLoadTestV1.LoadTestTypeK6,
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestFinished,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: apisLoadTestV1.LoadTestTypeK6,
},
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestCreating,
},
error: nil,
expectedError: "",
expectedResult: 0,
expectedTypes: 2,
},
{
scenario: "unknown type",
result: &apisLoadTestV1.LoadTestList{
Items: []apisLoadTestV1.LoadTest{
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestCreating,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: "foo",
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestFinished,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: "bar",
},
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestErrored,
},
error: nil,
expectedError: "",
expectedResult: 1,
expectedTypes: 0,
},
{
scenario: "success",
result: &apisLoadTestV1.LoadTestList{
Items: []apisLoadTestV1.LoadTest{
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestRunning,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: apisLoadTestV1.LoadTestTypeK6,
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestCreating,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: apisLoadTestV1.LoadTestTypeGhz,
},
},
{
Status: apisLoadTestV1.LoadTestStatus{
Phase: apisLoadTestV1.LoadTestErrored,
},
Spec: apisLoadTestV1.LoadTestSpec{
Type: apisLoadTestV1.LoadTestTypeJMeter,
},
},
},
},
}, nil
})
error: nil,
expectedError: "",
expectedResult: 2,
expectedTypes: 1,
},
}

logger := zap.NewNop()
c := NewClient(loadtestClientset.KangalV1().LoadTests(), kubeClientSet, logger)
counter, err := c.CountActiveLoadTests(ctx)
assert.NoError(t, err)
assert.Equal(t, 2, counter)
for _, tc := range testCases {
tc := tc
t.Run(tc.scenario, func(t *testing.T) {
t.Parallel()

loadtestClientset := fakeClientset.NewSimpleClientset()
kubeClientSet := fake.NewSimpleClientset()

loadtestClientset.Fake.PrependReactor("list", "loadtests", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tc.result, tc.error
})

logger := zap.NewNop()
c := NewClient(loadtestClientset.KangalV1().LoadTests(), kubeClientSet, logger)
states, types, err := c.CountExistingLoadtests()
if tc.expectedError == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedError)
}
assert.Equal(t, types[apisLoadTestV1.LoadTestTypeK6], tc.expectedTypes)

total := states["running"] + states["creating"]
assert.Equal(t, tc.expectedResult, int(total))
})
}
}

func TestGetLoadTestNoLoadTest(t *testing.T) {
Expand Down
24 changes: 17 additions & 7 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,26 @@ type MetricsReporter struct {
// NewMetricsReporter contains loadtest metrics definition
func NewMetricsReporter(meter metric.Meter, kubeClient *kube.Client) (*MetricsReporter, error) {
countRunningLoadtests, err := meter.AsyncInt64().UpDownCounter(
"kangal_running_loadtests_count",
instrument.WithDescription("The number of currently running loadtests"),
"kangal_loadtests_count",
instrument.WithDescription("Current number of loadtests in cluster, grouped by type and phase"),
instrument.WithUnit(unit.Dimensionless),
)
if err != nil {
return nil, fmt.Errorf("could not register countRunningLoadtests metric: %w", err)
}

if err := meter.RegisterCallback([]instrument.Asynchronous{countRunningLoadtests}, func(ctx context.Context) {
lt := kubeClient.CountRunningLoadtests()
countRunningLoadtests.Observe(ctx, lt, attribute.String("loadtest", "running"))
states, types, err := kubeClient.CountExistingLoadtests()
if err != nil {
fmt.Errorf("could not get metric data for CountExistingLoadtests: %w", err)
}
for k, v := range states {
countRunningLoadtests.Observe(ctx, v, attribute.String("phase", k.String()))
}

for k, v := range types {
countRunningLoadtests.Observe(ctx, v, attribute.String("type", k.String()))
}
},
); err != nil {
return nil, err
Expand Down Expand Up @@ -202,15 +211,16 @@ func (p *Proxy) Create(w http.ResponseWriter, r *http.Request) {
}

// check the number of active loadtests currently running on the cluster
activeLoadTests, err := p.kubeClient.CountActiveLoadTests(ctx)
testsByPhase, _, err := p.kubeClient.CountExistingLoadtests()
if err != nil {
logger.Error("Could not count active load tests", zap.Error(err))
render.Render(w, r, cHttp.ErrResponse(http.StatusInternalServerError, "Could not count active load tests"))
return
}
activeLoadTests := testsByPhase["running"] + testsByPhase["creating"]

if activeLoadTests >= p.maxLoadTestsRun {
logger.Warn("number of active load tests reached limit", zap.Int("current", activeLoadTests), zap.Int("limit", p.maxLoadTestsRun))
if int(activeLoadTests) >= p.maxLoadTestsRun {
logger.Warn("number of active load tests reached limit", zap.Int("current", int(activeLoadTests)), zap.Int("limit", p.maxLoadTestsRun))
render.Render(w, r, cHttp.ErrResponse(http.StatusTooManyRequests, "Number of active load tests reached limit"))
return
}
Expand Down

0 comments on commit bad91bb

Please sign in to comment.