Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: reuse lookupd http client #333

Merged
merged 1 commit into from
Oct 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,14 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}

func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
},
}
return transport
}

type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}

// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, headers http.Header, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint string, headers http.Header, ret interface{}) error {
req, err := http.NewRequest(method, endpoint, nil)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Config struct {
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
LookupdPollTimeout time.Duration `opt:"lookupd_poll_timeout" default:"1m"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
Expand Down
25 changes: 24 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type Consumer struct {
lookupdRecheckChan chan int
lookupdHTTPAddrs []string
lookupdQueryIndex int
lookupdHttpClient *http.Client

wg sync.WaitGroup
runningHandlers int32
Expand Down Expand Up @@ -326,6 +327,11 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
}
}

// set lookupd http client
func (r *Consumer) SetLookupdHttpClient(httpclient *http.Client) {
r.lookupdHttpClient = httpclient
}

// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
Expand Down Expand Up @@ -355,6 +361,23 @@ func (r *Consumer) ConnectToNSQLookupd(addr string) error {
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, parsedAddr)
if r.lookupdHttpClient == nil {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: r.config.LookupdPollTimeout,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: r.config.LookupdPollTimeout,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the http.Client Timeout but I guess it's fine.
(Dialer Timeout above could probably be a shorter default, probably matching the 10s TLSHandshakeTimeout below. But again, I guess it's fine, doesn't really hurt.)

MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
r.lookupdHttpClient = &http.Client{
Transport: transport,
Timeout: r.config.LookupdPollTimeout,
}
}

numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()

Expand Down Expand Up @@ -468,7 +491,7 @@ retry:
if r.config.AuthSecret != "" && r.config.LookupdAuthorization {
headers.Set("Authorization", fmt.Sprintf("Bearer %s", r.config.AuthSecret))
}
err := apiRequestNegotiateV1("GET", endpoint, headers, &data)
err := apiRequestNegotiateV1(r.lookupdHttpClient, "GET", endpoint, headers, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
Expand Down