@@ -11,15 +11,21 @@ import (
11
11
12
12
"github.com/smartcontractkit/chainlink-common/pkg/logger"
13
13
"github.com/smartcontractkit/chainlink-common/pkg/services"
14
- common "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/common"
15
14
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
16
15
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
17
16
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
18
- gwcommon "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
17
+ "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
19
18
)
20
19
21
20
const (
22
- defaultFetchTimeoutMs = 20_000
21
+ DefaultGlobalRPS = 100.0
22
+ DefaultGlobalBurst = 100
23
+ DefaultPerSenderRPS = 100.0
24
+ DefaultPerSenderBurst = 100
25
+ DefaultWorkflowRPS = 5.0
26
+ DefaultWorkflowBurst = 50
27
+ defaultFetchTimeoutMs = 20_000
28
+
23
29
errorOutgoingRatelimitGlobal = "global limit of gateways requests has been exceeded"
24
30
errorOutgoingRatelimitWorkflow = "workflow exceeded limit of gateways requests"
25
31
errorIncomingRatelimitGlobal = "message from gateway exceeded global rate limit"
@@ -34,18 +40,19 @@ type OutgoingConnectorHandler struct {
34
40
gatewaySelector * RoundRobinSelector
35
41
method string
36
42
lggr logger.Logger
37
- incomingRateLimiter * gwcommon .RateLimiter
43
+ incomingRateLimiter * common .RateLimiter
38
44
outgoingRateLimiter * common.RateLimiter
39
45
responses * responses
40
46
}
41
47
42
48
func NewOutgoingConnectorHandler (gc connector.GatewayConnector , config ServiceConfig , method string , lgger logger.Logger ) (* OutgoingConnectorHandler , error ) {
43
- outgoingRateLimiter , err := common .NewRateLimiter (config .OutgoingRateLimiter )
49
+ outgoingRLCfg := outgoingRateLimiterConfigDefaults (config .OutgoingRateLimiter )
50
+ outgoingRateLimiter , err := common .NewRateLimiter (outgoingRLCfg )
44
51
if err != nil {
45
52
return nil , err
46
53
}
47
-
48
- incomingRateLimiter , err := gwcommon .NewRateLimiter (config . IncomingRateLimiter )
54
+ incomingRLCfg := incomingRateLimiterConfigDefaults ( config . RateLimiter )
55
+ incomingRateLimiter , err := common .NewRateLimiter (incomingRLCfg )
49
56
if err != nil {
50
57
return nil , err
51
58
}
@@ -70,7 +77,7 @@ func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceCo
70
77
func (c * OutgoingConnectorHandler ) HandleSingleNodeRequest (ctx context.Context , messageID string , req capabilities.Request ) (* api.Message , error ) {
71
78
lggr := logger .With (c .lggr , "messageID" , messageID , "workflowID" , req .WorkflowID )
72
79
73
- workflowAllow , globalAllow := c .outgoingRateLimiter .Allow (req .WorkflowID )
80
+ workflowAllow , globalAllow := c .outgoingRateLimiter .AllowVerbose (req .WorkflowID )
74
81
if ! workflowAllow {
75
82
return nil , errors .New (errorOutgoingRatelimitWorkflow )
76
83
}
@@ -129,7 +136,7 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
129
136
select {
130
137
case resp := <- ch :
131
138
switch resp .Body .Method {
132
- case api .Method_InternalError :
139
+ case api .MethodInternalError :
133
140
var errPayload api.JsonRPCError
134
141
err := json .Unmarshal (resp .Body .Payload , & errPayload )
135
142
if err != nil {
@@ -179,7 +186,7 @@ func (c *OutgoingConnectorHandler) HandleGatewayMessage(ctx context.Context, gat
179
186
errMsg := api.Message {
180
187
Body : api.MessageBody {
181
188
MessageId : body .MessageId ,
182
- Method : api .Method_InternalError ,
189
+ Method : api .MethodInternalError ,
183
190
Payload : errPayload ,
184
191
},
185
192
}
@@ -228,6 +235,37 @@ func (c *OutgoingConnectorHandler) Name() string {
228
235
return c .lggr .Name ()
229
236
}
230
237
238
+ func incomingRateLimiterConfigDefaults (config common.RateLimiterConfig ) common.RateLimiterConfig {
239
+ if config .GlobalBurst == 0 {
240
+ config .GlobalBurst = DefaultGlobalBurst
241
+ }
242
+ if config .GlobalRPS == 0 {
243
+ config .GlobalRPS = DefaultGlobalRPS
244
+ }
245
+ if config .PerSenderBurst == 0 {
246
+ config .PerSenderBurst = DefaultPerSenderBurst
247
+ }
248
+ if config .PerSenderRPS == 0 {
249
+ config .PerSenderRPS = DefaultPerSenderRPS
250
+ }
251
+ return config
252
+ }
253
+ func outgoingRateLimiterConfigDefaults (config common.RateLimiterConfig ) common.RateLimiterConfig {
254
+ if config .GlobalBurst == 0 {
255
+ config .GlobalBurst = DefaultGlobalBurst
256
+ }
257
+ if config .GlobalRPS == 0 {
258
+ config .GlobalRPS = DefaultGlobalRPS
259
+ }
260
+ if config .PerSenderBurst == 0 {
261
+ config .PerSenderBurst = DefaultWorkflowBurst
262
+ }
263
+ if config .PerSenderRPS == 0 {
264
+ config .PerSenderRPS = DefaultWorkflowRPS
265
+ }
266
+ return config
267
+ }
268
+
231
269
func validMethod (method string ) bool {
232
270
switch method {
233
271
case capabilities .MethodWebAPITarget , capabilities .MethodComputeAction , capabilities .MethodWorkflowSyncer :
0 commit comments