Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cappl 588/rate limit per workflow #16672

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
11 changes: 6 additions & 5 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,12 @@ func (f *outgoingConnectorFetcherFactory) NewFetcher(log logger.Logger, emitter
}

resp, err := f.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Body: req.Body,
TimeoutMs: req.TimeoutMs,
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Body: req.Body,
TimeoutMs: req.TimeoutMs,
WorkflowID: req.Metadata.WorkflowId,
})
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ const (

var defaultConfig = Config{
ServiceConfig: webapi.ServiceConfig{
OutgoingRateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
Expand Down
153 changes: 126 additions & 27 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,41 @@ import (
)

const (
DefaultGlobalRPS = 100.0
DefaultGlobalBurst = 100
DefaultPerSenderRPS = 100.0
DefaultPerSenderBurst = 100
DefaultWorkflowRPS = 5.0
DefaultWorkflowBurst = 50
defaultFetchTimeoutMs = 20_000

errorOutgoingRatelimitGlobal = "global limit of gateways requests has been exceeded"
errorOutgoingRatelimitWorkflow = "workflow exceeded limit of gateways requests"
errorIncomingRatelimitGlobal = "message from gateway exceeded global rate limit"
errorIncomingRatelimitSender = "message from gateway exceeded per sender rate limit"
)

var _ connector.GatewayConnectorHandler = &OutgoingConnectorHandler{}

type OutgoingConnectorHandler struct {
services.StateMachine
gc connector.GatewayConnector
gatewaySelector *RoundRobinSelector
method string
lggr logger.Logger
rateLimiter *common.RateLimiter
responses *responses
gc connector.GatewayConnector
gatewaySelector *RoundRobinSelector
method string
lggr logger.Logger
incomingRateLimiter *common.RateLimiter
outgoingRateLimiter *common.RateLimiter
responses *responses
}

func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceConfig, method string, lgger logger.Logger) (*OutgoingConnectorHandler, error) {
rateLimiter, err := common.NewRateLimiter(config.RateLimiter)
outgoingRLCfg := outgoingRateLimiterConfigDefaults(config.OutgoingRateLimiter)
outgoingRateLimiter, err := common.NewRateLimiter(outgoingRLCfg)
if err != nil {
return nil, err
}
incomingRLCfg := incomingRateLimiterConfigDefaults(config.RateLimiter)
incomingRateLimiter, err := common.NewRateLimiter(incomingRLCfg)
if err != nil {
return nil, err
}
Expand All @@ -44,18 +62,29 @@ func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceCo
}

return &OutgoingConnectorHandler{
gc: gc,
gatewaySelector: NewRoundRobinSelector(gc.GatewayIDs()),
method: method,
responses: newResponses(),
rateLimiter: rateLimiter,
lggr: lgger,
gc: gc,
gatewaySelector: NewRoundRobinSelector(gc.GatewayIDs()),
method: method,
responses: newResponses(),
outgoingRateLimiter: outgoingRateLimiter,
incomingRateLimiter: incomingRateLimiter,
lggr: lgger,
}, nil
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries
func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, req capabilities.Request) (*api.Message, error) {
lggr := logger.With(c.lggr, "messageID", messageID, "workflowID", req.WorkflowID)

workflowAllow, globalAllow := c.outgoingRateLimiter.AllowVerbose(req.WorkflowID)
if !workflowAllow {
return nil, errors.New(errorOutgoingRatelimitWorkflow)
}
if !globalAllow {
return nil, errors.New(errorOutgoingRatelimitGlobal)
}

// set default timeout if not provided for all outgoing requests
if req.TimeoutMs == 0 {
req.TimeoutMs = defaultFetchTimeoutMs
Expand All @@ -78,8 +107,7 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
defer c.responses.cleanup(messageID)

l := logger.With(c.lggr, "messageID", messageID)
l.Debugw("sending request to gateway")
lggr.Debugw("sending request to gateway")

body := &api.MessageBody{
MessageId: messageID,
Expand All @@ -93,7 +121,9 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
return nil, fmt.Errorf("failed to select gateway: %w", err)
}

l.Infow("selected gateway, awaiting connection", "gatewayID", selectedGateway)
lggr = logger.With(lggr, "gatewayID", selectedGateway)

lggr.Infow("selected gateway, awaiting connection")

if err := c.gc.AwaitConnection(ctx, selectedGateway); err != nil {
return nil, errors.Wrap(err, "await connection canceled")
Expand All @@ -105,22 +135,65 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,

select {
case resp := <-ch:
l.Debugw("received response from gateway", "gatewayID", selectedGateway)
return resp, nil
switch resp.Body.Method {
case api.MethodInternalError:
var errPayload api.JsonRPCError
err := json.Unmarshal(resp.Body.Payload, &errPayload)
if err != nil {
lggr.Errorw("failed to unmarshal err payload", "err", err)
return nil, errors.New("unknown internal error")
}
return nil, errors.New(errPayload.Message)
default:
lggr.Debugw("received response from gateway")
return resp, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

// HandleGatewayMessage processes incoming messages from the Gateway,
// which are in response to a HandleSingleNodeRequest call.
func (c *OutgoingConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId)
if !c.rateLimiter.Allow(body.Sender) {
// error is logged here instead of warning because if a message from gateway is rate-limited,
// the workflow will eventually fail with timeout as there are no retries in place yet
c.lggr.Errorw("request rate-limited")

ch, ok := c.responses.get(body.MessageId)
if !ok {
l.Warnw("no response channel found; this may indicate that the node timed out the request")
return
}

senderAllow, globalAllow := c.incomingRateLimiter.AllowVerbose(body.Sender)
errJSON := api.JsonRPCError{
Code: 500,
Message: "",
}
if !senderAllow {
errJSON.Message = errorIncomingRatelimitSender
}
if !globalAllow {
errJSON.Message += errorIncomingRatelimitGlobal
}

if errJSON.Message != "" {
l.Errorw("request rate-limited")
errPayload, err := json.Marshal(errJSON)
if err != nil {
l.Errorw("failed to marshal err payload", "err", err)
}
errMsg := api.Message{
Body: api.MessageBody{
MessageId: body.MessageId,
Method: api.MethodInternalError,
Payload: errPayload,
},
}
ch <- &errMsg
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically out of scope, but could we return an error here back to the caller? I think this results in nicer UX by telling users they were rate limited rather than returning a timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Changing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

l.Debugw("handling gateway request")
switch body.Method {
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction, capabilities.MethodWorkflowSyncer:
Expand All @@ -131,11 +204,6 @@ func (c *OutgoingConnectorHandler) HandleGatewayMessage(ctx context.Context, gat
l.Errorw("failed to unmarshal payload", "err", err)
return
}
ch, ok := c.responses.get(body.MessageId)
if !ok {
l.Warnw("no response channel found; this may indicate that the node timed out the request")
return
}
select {
case ch <- msg:
return
Expand Down Expand Up @@ -167,6 +235,37 @@ func (c *OutgoingConnectorHandler) Name() string {
return c.lggr.Name()
}

func incomingRateLimiterConfigDefaults(config common.RateLimiterConfig) common.RateLimiterConfig {
if config.GlobalBurst == 0 {
config.GlobalBurst = DefaultGlobalBurst
}
if config.GlobalRPS == 0 {
config.GlobalRPS = DefaultGlobalRPS
}
if config.PerSenderBurst == 0 {
config.PerSenderBurst = DefaultPerSenderBurst
}
if config.PerSenderRPS == 0 {
config.PerSenderRPS = DefaultPerSenderRPS
}
return config
}
func outgoingRateLimiterConfigDefaults(config common.RateLimiterConfig) common.RateLimiterConfig {
if config.GlobalBurst == 0 {
config.GlobalBurst = DefaultGlobalBurst
}
if config.GlobalRPS == 0 {
config.GlobalRPS = DefaultGlobalRPS
}
if config.PerSenderBurst == 0 {
config.PerSenderBurst = DefaultWorkflowBurst
}
if config.PerSenderRPS == 0 {
config.PerSenderRPS = DefaultWorkflowRPS
}
return config
}

func validMethod(method string) bool {
switch method {
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction, capabilities.MethodWorkflowSyncer:
Expand Down
Loading
Loading