Skip to content

Commit

Permalink
Added support metrics, ssl_skip flag
Browse files Browse the repository at this point in the history
  • Loading branch information
gabber12 committed Aug 12, 2024
1 parent b6cc3c7 commit 45b17a7
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 118 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ go.work.sum

# env file
.env
Corefile
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The [manual](https://coredns.io/manual/toc/#what-is-coredns) will have more info
A simple way to consume this plugin, is by adding the following on [plugin.cfg](https://github.com/coredns/coredns/blob/master/plugin.cfg), and recompile it as [detailed on coredns.io](https://coredns.io/2017/07/25/compile-time-enabling-or-disabling-plugins/#build-with-compile-time-configuration-file).

~~~
example:github.com/santanusinha/coredns
drove:github.com/PhonePe/coredns-drove
~~~

Put this early in the plugin list, so that *drovedns* is executed before any of the other plugins.
Expand All @@ -43,16 +43,30 @@ drovedns {
accesstoken [TOKEN]
user [USERNAME]
pass [PASSWORD]
skip_ssl_check
}
~~~
* `URL` - Comma seperated list of drove controllers
* `TOKEN` - In case drove controllers are using bearer auth Complete Authorization header "Bearer ..."
* `user` `pass` - In case drove is using basic auth
* `skip_ssl_check` - To skip client side ssl certificate validation

## Ready

This plugin reports readiness to the ready plugin. It will be immediately ready.

## Metrics

If monitoring is enabled (via the *prometheus* plugin) then the following metrics are exported:

* `coredns_drove_controller_health{host}` - Exports the health of controller at any given point
The following are client level metrics to monitor apiserver request latency & status codes. `verb` identifies the apiserver [request type](https://kubernetes.io/docs/reference/using-api/api-concepts/#single-resource-api) and `host` denotes the apiserver endpoint.
* `coredns_drove_sync_total` - captures total app syncs from drove.
* `coredns_drove_sync_failure` - captures failed app syncs from drove.
* `coredns_drove_api_total{status_code, method, host}` - captures drove request grouped by `status_code`, `method` & `host`.



## Examples

In this configuration, we resolve queries through the plugin and enrich the answers with A record from servers listed in local resolv.conf
Expand Down
77 changes: 67 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package drovedns

import (
"fmt"
"sync"
)

// Host struct
type Host struct {
Host string
Expand Down Expand Up @@ -27,16 +32,18 @@ type DroveServiceHost struct {
PortType string `json:"portType"`
}

// DroveApps struct for our apps nested with tasks.
type DroveApps struct {
Status string `json:"status"`
Apps []struct {
ID string `json:"appId"`
Vhost string `json:"vhost"`
Tags map[string]string `json:"tags"`
Hosts []DroveServiceHost `json:"hosts"`
} `json:"data"`
Message string `json:"message"`
// DroveAppsResponse struct for our apps nested with tasks.
type DroveAppsResponse struct {
Status string `json:"status"`
Apps []DroveApp `json:"data"`
Message string `json:"message"`
}

type DroveApp struct {
ID string `json:"appId"`
Vhost string `json:"vhost"`
Tags map[string]string `json:"tags"`
Hosts []DroveServiceHost `json:"hosts"`
}

type DroveEventSummary struct {
Expand All @@ -49,3 +56,53 @@ type DroveEventsApiResponse struct {
EventSummary DroveEventSummary `json:"data"`
Message string `json:"message"`
}

type LeaderController struct {
Endpoint string
Host string
Port int32
}

type EndpointStatus struct {
Endpoint string
Healthy bool
Message string
}

type CurrSyncPoint struct {
sync.RWMutex
LastSyncTime int64
}

type DroveAuthConfig struct {
User string
Pass string
AccessToken string
}

func (dc DroveAuthConfig) Validate() error {
if dc.User == "" && dc.Pass == "" && dc.AccessToken == "" {
return fmt.Errorf("User-pass or AccessToken should be set")
}
if (dc.Pass != "" || dc.User != "") && dc.AccessToken != "" {
return fmt.Errorf("Both user-pass and access token should not be set")
}
return nil
}

type DroveConfig struct {
Endpoint string
AuthConfig DroveAuthConfig
SkipSSL bool
}

func (dc DroveConfig) Validate() error {
if dc.Endpoint == "" {
return fmt.Errorf("Endpoint Cant be empty")
}
return dc.AuthConfig.Validate()
}

func NewDroveConfig() DroveConfig {
return DroveConfig{SkipSSL: false, AuthConfig: DroveAuthConfig{}}
}
3 changes: 2 additions & 1 deletion cmd/coredns/example/Corefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
cache 30
ready
forward . /etc/resolv.conf
metrics
drove {
endpoint "https://drovecontrol001.exmaple.com:8080,https://drovecontrol002.exmaple.com:8080,https://drovecontrol003.exmaple.com:8080:8080"
access_token "O-Bearer <token>"
access_token "Bearer <token>"
}
}
74 changes: 27 additions & 47 deletions drove.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,17 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
)

type LeaderController struct {
Endpoint string
Host string
Port int32
}

type EndpointStatus struct {
Endpoint string
Healthy bool
Message string
}
const (
FETCH_APP_TIMEOUT time.Duration = time.Duration(5) * time.Second
FETCH_EVENTS_TIMEOUT time.Duration = time.Duration(5) * time.Second
PING_TIMEOUT time.Duration = time.Duration(5) * time.Second
)

type CurrSyncPoint struct {
sync.RWMutex
LastSyncTime int64
}
type IDroveClient interface {
FetchApps() (*DroveApps, error)
FetchApps() (*DroveAppsResponse, error)
FetchRecentEvents(syncPoint *CurrSyncPoint) (*DroveEventSummary, error)
PollEvents(callback func(event *DroveEventSummary))
}
Expand All @@ -43,27 +32,21 @@ type DroveClient struct {
client *http.Client
}

type DroveAuthConfig struct {
User string
Pass string
AccessToken string
}

func NewDroveClient(endpoint string, authConfig DroveAuthConfig) DroveClient {
controllerEndpoints := strings.Split(endpoint, ",")
func NewDroveClient(config DroveConfig) DroveClient {
controllerEndpoints := strings.Split(config.Endpoint, ",")
endpoints := make([]EndpointStatus, len(controllerEndpoints))
for i, e := range controllerEndpoints {
endpoints[i] = EndpointStatus{e, true, ""}
}
tr := &http.Transport{MaxIdleConnsPerHost: 10, TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
tr := &http.Transport{MaxIdleConnsPerHost: 10, TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipSSL}}
httpClient := &http.Client{
Timeout: 0 * time.Second,
Transport: tr,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
return DroveClient{Endpoint: endpoints, AuthConfig: &authConfig, client: httpClient}
return DroveClient{Endpoint: endpoints, AuthConfig: &config.AuthConfig, client: httpClient}
}

func (c *DroveClient) Init() error {
Expand All @@ -72,19 +55,26 @@ func (c *DroveClient) Init() error {
_, err := c.endpoint()
return err
}
func (c *DroveClient) getRequest(endpoint string, timeout int, obj any) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
func (c *DroveClient) getRequest(path string, timeout time.Duration, obj any) error {
host, err := c.endpoint()
if err != nil {
return err
}
endpoint := host + path
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
if err != nil {
return err
}

setHeaders(*c.AuthConfig, req)
resp, err := c.client.Do(req)
if err != nil {
DroveApiRequests.WithLabelValues("err", "GET", host).Inc()
return err
}

DroveApiRequests.WithLabelValues(strconv.Itoa(resp.StatusCode), "GET", host).Inc()
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)

Expand All @@ -95,26 +85,18 @@ func (c *DroveClient) getRequest(endpoint string, timeout int, obj any) error {
return nil
}

func (c *DroveClient) FetchApps() (*DroveApps, error) {
endpoint, err := c.endpoint()
if err != nil {
return nil, err
}
func (c *DroveClient) FetchApps() (*DroveAppsResponse, error) {

var timeout int = 5
jsonapps := &DroveApps{}
err = c.getRequest(endpoint+"/apis/v1/endpoints", timeout, jsonapps)
jsonapps := &DroveAppsResponse{}
err := c.getRequest("/apis/v1/endpoints", FETCH_APP_TIMEOUT, jsonapps)
return jsonapps, err

}

func (c *DroveClient) FetchRecentEvents(syncPoint *CurrSyncPoint) (*DroveEventSummary, error) {
endpoint, err := c.endpoint()
if err != nil {
return nil, err
}

var newEventsApiResponse = DroveEventsApiResponse{}
err = c.getRequest(endpoint+"/apis/v1/cluster/events/summary?lastSyncTime="+fmt.Sprint(syncPoint.LastSyncTime), 5, &newEventsApiResponse)
err := c.getRequest("/apis/v1/cluster/events/summary?lastSyncTime="+fmt.Sprint(syncPoint.LastSyncTime), FETCH_EVENTS_TIMEOUT, &newEventsApiResponse)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -195,6 +177,7 @@ func (c *DroveClient) endpoint() (string, error) {
func (c *DroveClient) refreshLeaderData() {
var endpoint string
for _, es := range c.Endpoint {
DroveControllerHealth.WithLabelValues(es.Endpoint).Set(boolToDouble(es.Healthy))
if es.Healthy {
endpoint = es.Endpoint
}
Expand All @@ -219,9 +202,6 @@ func (c *DroveClient) endpointHealth() {
for {
select {
case <-ticker.C:
//logger.WithFields(logrus.Fields{
// "health": health,
//}).Info("Reloading endpoint health")
shouldReturn := c.updateHealth()
if shouldReturn {
return
Expand All @@ -234,7 +214,7 @@ func (c *DroveClient) endpointHealth() {
func (c *DroveClient) updateHealth() bool {
log.Debugf("Updating health %+v", c.Endpoint)
for i, es := range c.Endpoint {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), PING_TIMEOUT)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", es.Endpoint+"/apis/v1/ping", nil)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions drove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestAppFetch(t *testing.T) {
defer server.Close()

// Use Client & URL from our local test server
client := NewDroveClient(server.URL, DroveAuthConfig{AccessToken: ""})
client := NewDroveClient(DroveConfig{Endpoint: server.URL, AuthConfig: DroveAuthConfig{AccessToken: ""}})
client.Init()
assert.NotNil(t, client.Leader)

Expand Down Expand Up @@ -63,15 +63,15 @@ func TestLeaderElection(t *testing.T) {
})

// Use Client & URL from our local test server
client := NewDroveClient(fmt.Sprintf("%s", server.URL), DroveAuthConfig{AccessToken: ""})
client := NewDroveClient(DroveConfig{Endpoint: server.URL, AuthConfig: DroveAuthConfig{AccessToken: ""}})
client.Init()
assert.Nil(t, client.Leader)

client = NewDroveClient("http://random.blah.endpoint.non-existent", DroveAuthConfig{AccessToken: ""})
client = NewDroveClient(DroveConfig{Endpoint: "http://random.blah.endpoint.non-existent", AuthConfig: DroveAuthConfig{AccessToken: ""}})
client.Init()
assert.Nil(t, client.Leader)

client = NewDroveClient(fmt.Sprintf("%s,%s", server.URL, server2.URL), DroveAuthConfig{AccessToken: ""})
client = NewDroveClient(DroveConfig{Endpoint: fmt.Sprintf("%s,%s", server.URL, server2.URL), AuthConfig: DroveAuthConfig{AccessToken: ""}})
client.Init()
assert.NotNil(t, client.Leader)
assert.Equal(t, server2.URL, client.Leader.Endpoint)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestLeaderFailover(t *testing.T) {
rw.WriteHeader(status2)
})

client := NewDroveClient(fmt.Sprintf("%s,%s", server.URL, server2.URL), DroveAuthConfig{AccessToken: ""})
client := NewDroveClient(DroveConfig{Endpoint: fmt.Sprintf("%s,%s", server.URL, server2.URL), AuthConfig: DroveAuthConfig{AccessToken: ""}})
client.Init()
assert.NotNil(t, client.Leader)
assert.Equal(t, server.URL, client.Leader.Endpoint)
Expand Down
23 changes: 19 additions & 4 deletions endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,36 @@ import (

type DroveEndpoints struct {
appsMutext *sync.RWMutex
AppsDB *DroveApps
AppsDB *DroveAppsResponse
DroveClient IDroveClient
}

func (dr *DroveEndpoints) setApps(appDB *DroveApps) {
func (dr *DroveEndpoints) setApps(appDB *DroveAppsResponse) {
dr.appsMutext.Lock()
dr.AppsDB = appDB
dr.appsMutext.Unlock()
}

func (dr *DroveEndpoints) getApps() DroveApps {
func (dr *DroveEndpoints) getApps() DroveAppsResponse {
dr.appsMutext.RLock()
defer dr.appsMutext.RUnlock()
if dr.AppsDB == nil {
return DroveApps{}
return DroveAppsResponse{}
}
return *dr.AppsDB
}

func (dr *DroveEndpoints) searchApps(questionName string) *DroveApp {
dr.appsMutext.RLock()
defer dr.appsMutext.RUnlock()
for _, app := range dr.AppsDB.Apps {
if app.Vhost+"." == questionName {
return &app
}
}
return nil
}

func newDroveEndpoints(client IDroveClient) *DroveEndpoints {
endpoints := DroveEndpoints{DroveClient: client, appsMutext: &sync.RWMutex{}}
ticker := time.NewTicker(10 * time.Second)
Expand All @@ -47,10 +58,14 @@ func newDroveEndpoints(client IDroveClient) *DroveEndpoints {
})
go func() {
var syncApp = func() {
DroveQueryTotal.Inc()
apps, err := endpoints.DroveClient.FetchApps()
if err != nil {
DroveQueryFailure.Inc()
log.Errorf("Error refreshing nodes data %+v", endpoints.AppsDB)
return
}

endpoints.setApps(apps)
}
syncApp()
Expand Down
Loading

0 comments on commit 45b17a7

Please sign in to comment.