Skip to content

Commit

Permalink
Propagate max capacity information to the actions back-end (#3431)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikola-jokic authored Apr 16, 2024
1 parent 8075e5e commit 4ee49fe
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 100 deletions.
16 changes: 9 additions & 7 deletions cmd/ghalistener/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type Client interface {
GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error)
CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error)
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error)
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error)
DeleteMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, messageId int64) error
AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error)
RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error)
Expand Down Expand Up @@ -80,6 +80,7 @@ type Listener struct {

// updated fields
lastMessageID int64 // The ID of the last processed message.
maxCapacity int // The maximum number of runners that can be created.
session *actions.RunnerScaleSetSession // The session for managing the runner scale set.
}

Expand All @@ -89,10 +90,11 @@ func New(config Config) (*Listener, error) {
}

listener := &Listener{
scaleSetID: config.ScaleSetID,
client: config.Client,
logger: config.Logger,
metrics: metrics.Discard,
scaleSetID: config.ScaleSetID,
client: config.Client,
logger: config.Logger,
metrics: metrics.Discard,
maxCapacity: config.MaxRunners,
}

if config.Metrics != nil {
Expand Down Expand Up @@ -267,7 +269,7 @@ func (l *Listener) createSession(ctx context.Context) error {

func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessage, error) {
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
if err == nil { // if NO error
return msg, nil
}
Expand All @@ -283,7 +285,7 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa

l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)

msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
if err != nil { // if NO error
return nil, fmt.Errorf("failed to get next message after message session refresh: %w", err)
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/ghalistener/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,14 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()
config.Client = client

l, err := New(config)
Expand All @@ -148,10 +149,11 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
config.Client = client

l, err := New(config)
Expand All @@ -170,6 +172,7 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -185,12 +188,12 @@ func TestListener_getMessage(t *testing.T) {
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()

client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()

want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()

config.Client = client

Expand All @@ -214,6 +217,7 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -229,7 +233,7 @@ func TestListener_getMessage(t *testing.T) {
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()

client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()

config.Client = client

Expand Down Expand Up @@ -450,6 +454,7 @@ func TestListener_Listen(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -470,7 +475,7 @@ func TestListener_Listen(t *testing.T) {
MessageType: "RunnerScaleSetJobMessages",
Statistics: &actions.RunnerScaleSetStatistic{},
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).
Return(msg, nil).
Run(
func(mock.Arguments) {
Expand Down
18 changes: 9 additions & 9 deletions cmd/ghalistener/listener/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *AutoScalerClient) Close() error {
return m.client.Close()
}

func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error) error {
func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error, maxCapacity int) error {
if m.initialMessage != nil {
err := handler(m.initialMessage)
if err != nil {
Expand All @@ -141,7 +141,7 @@ func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler
}

for {
message, err := m.client.GetMessage(ctx, m.lastMessageId)
message, err := m.client.GetMessage(ctx, m.lastMessageId, maxCapacity)
if err != nil {
return fmt.Errorf("get message failed from refreshing client. %w", err)
}
Expand Down
38 changes: 19 additions & 19 deletions cmd/githubrunnerscalesetlistener/autoScalerMessageListener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestGetRunnerScaleSetMessage(t *testing.T) {
Statistics: &actions.RunnerScaleSetStatistic{},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(&actions.RunnerScaleSetMessage{
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(&actions.RunnerScaleSetMessage{
MessageId: 1,
MessageType: "test",
Body: "test",
Expand All @@ -332,15 +332,15 @@ func TestGetRunnerScaleSetMessage(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.NoError(t, err, "Error getting message")
assert.Equal(t, int64(0), asClient.lastMessageId, "Initial message")

err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.NoError(t, err, "Error getting message")
assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated")
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestGetRunnerScaleSetMessage_HandleFailed(t *testing.T) {
Statistics: &actions.RunnerScaleSetStatistic{},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(&actions.RunnerScaleSetMessage{
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(&actions.RunnerScaleSetMessage{
MessageId: 1,
MessageType: "test",
Body: "test",
Expand All @@ -383,14 +383,14 @@ func TestGetRunnerScaleSetMessage_HandleFailed(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.NoError(t, err, "Error getting message")

err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return fmt.Errorf("error")
})
}, 10)

assert.ErrorContains(t, err, "handle message failed. error", "Error getting message")
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should not be updated")
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestGetRunnerScaleSetMessage_HandleInitialMessage(t *testing.T) {
TotalAssignedJobs: 2,
},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything, mock.Anything).Return(session, nil)
mockActionsClient.On("GetAcquirableJobs", ctx, 1).Return(&actions.AcquirableJobList{
Count: 1,
Jobs: []actions.AcquirableJob{
Expand All @@ -439,7 +439,7 @@ func TestGetRunnerScaleSetMessage_HandleInitialMessage(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.NoError(t, err, "Error getting message")
assert.Nil(t, asClient.initialMessage, "Initial message should be nil")
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestGetRunnerScaleSetMessage_HandleInitialMessageFailed(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return fmt.Errorf("error")
})
}, 10)

assert.ErrorContains(t, err, "fail to process initial message. error", "Error getting message")
assert.NotNil(t, asClient.initialMessage, "Initial message should be nil")
Expand Down Expand Up @@ -516,8 +516,8 @@ func TestGetRunnerScaleSetMessage_RetryUntilGetMessage(t *testing.T) {
Statistics: &actions.RunnerScaleSetStatistic{},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(nil, nil).Times(3)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(&actions.RunnerScaleSetMessage{
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(nil, nil).Times(3)
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(&actions.RunnerScaleSetMessage{
MessageId: 1,
MessageType: "test",
Body: "test",
Expand All @@ -532,13 +532,13 @@ func TestGetRunnerScaleSetMessage_RetryUntilGetMessage(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)
assert.NoError(t, err, "Error getting initial message")

err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.NoError(t, err, "Error getting message")
assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated")
Expand All @@ -565,7 +565,7 @@ func TestGetRunnerScaleSetMessage_ErrorOnGetMessage(t *testing.T) {
Statistics: &actions.RunnerScaleSetStatistic{},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(nil, fmt.Errorf("error"))
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(nil, fmt.Errorf("error"))

asClient, err := NewAutoScalerClient(ctx, mockActionsClient, &logger, 1, func(asc *AutoScalerClient) {
asc.client = mockSessionClient
Expand All @@ -575,12 +575,12 @@ func TestGetRunnerScaleSetMessage_ErrorOnGetMessage(t *testing.T) {
// process initial message
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
return nil
})
}, 10)
assert.NoError(t, err, "Error getting initial message")

err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
return fmt.Errorf("Should not be called")
})
}, 10)

assert.ErrorContains(t, err, "get message failed from refreshing client. error", "Error should be returned")
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be updated")
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestDeleteRunnerScaleSetMessage_Error(t *testing.T) {
Statistics: &actions.RunnerScaleSetStatistic{},
}
mockActionsClient.On("CreateMessageSession", ctx, 1, mock.Anything).Return(session, nil)
mockSessionClient.On("GetMessage", ctx, int64(0)).Return(&actions.RunnerScaleSetMessage{
mockSessionClient.On("GetMessage", ctx, int64(0), mock.Anything).Return(&actions.RunnerScaleSetMessage{
MessageId: 1,
MessageType: "test",
Body: "test",
Expand All @@ -623,13 +623,13 @@ func TestDeleteRunnerScaleSetMessage_Error(t *testing.T) {
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)
assert.NoError(t, err, "Error getting initial message")

err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
return nil
})
}, 10)

assert.ErrorContains(t, err, "delete message failed from refreshing client. error", "Error getting message")
assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated")
Expand Down
2 changes: 1 addition & 1 deletion cmd/githubrunnerscalesetlistener/autoScalerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *Service) Start() error {
s.logger.Info("service is stopped.")
return nil
default:
err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage)
err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage, s.settings.MaxRunners)
if err != nil {
return fmt.Errorf("could not get and process message. %w", err)
}
Expand Down
Loading

0 comments on commit 4ee49fe

Please sign in to comment.