-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Rate Limit support #1442
base: development
Are you sure you want to change the base?
Add Rate Limit support #1442
Conversation
…miter merging changes
I got these logs when trying to run and test this functionality. |
…miter merging logger support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good code and approach, but I would like to go a bit further if with a few things I'm suggesting.
I think it will work and will make code cleaner and easier to read and maintain
select { | ||
case <-req.ctx.Done(): | ||
r.Log(&ErrorLog{ | ||
Log: &Log{ | ||
Timestamp: time.Now(), | ||
CorrelationID: traceID, | ||
ResponseTime: time.Since(startTime).Microseconds(), | ||
ResponseCode: http.StatusGatewayTimeout, | ||
}, | ||
ErrorMessage: fmt.Sprintf("request canceled in queue: %v", req.ctx.Err()), | ||
}) | ||
req.respCh <- &requestResponse{nil, fmt.Errorf("request canceled in queue: %w", req.ctx.Err())} | ||
|
||
return | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is something I don't get here
Using a select with a default, so you are waiting for an event, but if you don't get it,you continue, without even a timeout wait.
I read it again, and I think you simply want to check the request cancelation, right?
So I would suggest to use this
select { | |
case <-req.ctx.Done(): | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusGatewayTimeout, | |
}, | |
ErrorMessage: fmt.Sprintf("request canceled in queue: %v", req.ctx.Err()), | |
}) | |
req.respCh <- &requestResponse{nil, fmt.Errorf("request canceled in queue: %w", req.ctx.Err())} | |
return | |
default: | |
} | |
if err := req.ctx.Err() { | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusGatewayTimeout, | |
}, | |
ErrorMessage: fmt.Sprintf("request canceled in queue: %v", err), | |
}) | |
req.respCh <- &requestResponse{nil, fmt.Errorf("request canceled in queue: %w", err)} | |
return | |
} |
Also, you can simplify a bit further
select { | |
case <-req.ctx.Done(): | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusGatewayTimeout, | |
}, | |
ErrorMessage: fmt.Sprintf("request canceled in queue: %v", req.ctx.Err()), | |
}) | |
req.respCh <- &requestResponse{nil, fmt.Errorf("request canceled in queue: %w", req.ctx.Err())} | |
return | |
default: | |
} | |
if err := req.ctx.Err() { | |
err = fmt.Errorf("request canceled in queue: %w", err) | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusGatewayTimeout, | |
}, | |
ErrorMessage: err.Error(), | |
}) | |
req.respCh <- &requestResponse{nil, err} | |
return | |
} |
if err := r.limiter.Wait(req.ctx); err != nil { | ||
r.Log(&ErrorLog{ | ||
Log: &Log{ | ||
Timestamp: time.Now(), | ||
CorrelationID: traceID, | ||
ResponseTime: time.Since(startTime).Microseconds(), | ||
ResponseCode: http.StatusTooManyRequests, | ||
}, | ||
ErrorMessage: fmt.Sprintf("rate limit wait error: %v", err), | ||
}) | ||
req.respCh <- &requestResponse{nil, fmt.Errorf("rate limit wait error: %w", err)} | ||
|
||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion about using this
if err := r.limiter.Wait(req.ctx); err != nil { | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusTooManyRequests, | |
}, | |
ErrorMessage: fmt.Sprintf("rate limit wait error: %v", err), | |
}) | |
req.respCh <- &requestResponse{nil, fmt.Errorf("rate limit wait error: %w", err)} | |
return | |
} | |
if err := r.limiter.Wait(req.ctx); err != nil { | |
err = fmt.Errorf("rate limit wait error: %w", err)} | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusTooManyRequests, | |
}, | |
ErrorMessage: err.Error(), | |
}) | |
req.respCh <- &requestResponse{nil, err) | |
return | |
} |
The idea is to make the code simpler to read, but also to maintain. The risk of updating only one of the error message is then limited
You can apply the following pattern almost everywhere in the code you are adding, except the panic recovery where you will use a %v because recover returns any
resp, err := req.execute() | ||
if err != nil { | ||
r.Log(&ErrorLog{ | ||
Log: &Log{ | ||
Timestamp: time.Now(), | ||
CorrelationID: traceID, | ||
ResponseTime: time.Since(startTime).Microseconds(), | ||
ResponseCode: http.StatusInternalServerError, | ||
}, | ||
ErrorMessage: fmt.Sprintf("request execution error: %v", err), | ||
}) | ||
req.respCh <- &requestResponse{nil, fmt.Errorf("request execution error: %w", err)} | ||
|
||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resp, err := req.execute() | |
if err != nil { | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusInternalServerError, | |
}, | |
ErrorMessage: fmt.Sprintf("request execution error: %v", err), | |
}) | |
req.respCh <- &requestResponse{nil, fmt.Errorf("request execution error: %w", err)} | |
return | |
} | |
resp, err := req.execute() | |
if err != nil { | |
err = fmt.Errorf("request execution error: %w", err) | |
r.Log(&ErrorLog{ | |
Log: &Log{ | |
Timestamp: time.Now(), | |
CorrelationID: traceID, | |
ResponseTime: time.Since(startTime).Microseconds(), | |
ResponseCode: http.StatusInternalServerError, | |
}, | |
ErrorMessage: err.Error(), | |
}) | |
req.respCh <- &requestResponse{nil, err} | |
return | |
} |
func (r *RateLimiter) Put(ctx context.Context, api string, queryParams map[string]any, | ||
body []byte) (*http.Response, error) { | ||
return r.enqueueRequest(ctx, func() (*http.Response, error) { | ||
return r.HTTP.Put(ctx, api, queryParams, body) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are using path
everywhere before and here you start using api
func (r *RateLimiter) Put(ctx context.Context, api string, queryParams map[string]any, | |
body []byte) (*http.Response, error) { | |
return r.enqueueRequest(ctx, func() (*http.Response, error) { | |
return r.HTTP.Put(ctx, api, queryParams, body) | |
}) | |
} | |
func (r *RateLimiter) Put(ctx context.Context, path string, queryParams map[string]any, | |
body []byte) (*http.Response, error) { | |
return r.enqueueRequest(ctx, func() (*http.Response, error) { | |
return r.HTTP.Put(ctx, path, queryParams, body) | |
}) | |
} |
Please also consider updating the others
// AddOption creates a new rate limiter with the config settings and wraps the provided HTTP client. | ||
func (r *RateLimiterConfig) AddOption(h HTTP) HTTP { | ||
switch v := h.(type) { | ||
case *httpService: | ||
rl := NewRateLimiter(context.Background(), r.Limit, r.Duration, r.MaxQueue) | ||
rl.HTTP = h | ||
rl.Logger = v.Logger | ||
|
||
return rl | ||
|
||
case *RateLimiter: | ||
return h | ||
default: | ||
rl := NewRateLimiter(context.Background(), r.Limit, r.Duration, r.MaxQueue) | ||
rl.HTTP = h | ||
|
||
return rl | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for applying the refactoring I asked for here
There is one last thing that may be considered as a problem to me.
Maybe @Umang01-hash @coolwednesday may help here.
The method allows you to pass a ratelimiter, and it's great, but then there is no way to pass the context, so you are creating a new one
Maybe the problem is the method signature, maybe I'm simply wrong, and current code is OK
Pull Request Template
Description:
Checklist:
goimport
andgolangci-lint
.Thank you for your contribution!