Skip to content

Commit bd1aa3f

Browse files
authored
remove broker monitoring dependency on other components (#20)
1 parent 564824a commit bd1aa3f

File tree

7 files changed

+110
-85
lines changed

7 files changed

+110
-85
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
all: push
1616

17-
TAG ?= 1.2.5
17+
TAG ?= 1.2.6
1818
PREFIX ?= kesque/pulsar-monitor
1919

2020
container:

src/broker-monitor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ func EvaluateBrokers(prefixURL, token string) error {
1414
name := GetConfig().Name + "-brokers" // again this is for in-cluster monitoring only
1515

1616
cfg := GetConfig().BrokersConfig
17-
failedBrokers, err := brokers.TestBrokers(prefixURL, token)
17+
clusterName := util.AssignString(GetConfig().ClusterName, GetConfig().Name)
18+
failedBrokers, err := brokers.TestBrokers(prefixURL, clusterName, token)
1819

1920
if failedBrokers > 0 {
2021
errMsg := fmt.Sprintf("cluster %s has %d unhealthy brokers, error message %v", name, failedBrokers, err)

src/brokers/broker-stats.go

Lines changed: 88 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,55 +18,65 @@ import (
1818

1919
var statsLog = log.WithFields(log.Fields{"app": "broker health monitor"})
2020

21-
// BrokerMonitor is the
22-
type BrokerMonitor struct {
23-
RESTURL string
24-
token string
25-
ExpectedBrokerNumber int
26-
}
27-
2821
const (
2922
topicStatsDBTable = "topic-stats"
3023
)
3124

32-
// BrokerStats is per broker statistics
33-
type BrokerStats struct {
34-
Broker string `json:"broker"`
35-
Data map[string]map[string]map[string]map[string]interface{} `json:"data"`
36-
}
25+
// GetBrokers gets a list of brokers and ports
26+
func GetBrokers(restBaseURL, clusterName, token string) ([]string, error) {
27+
brokersURL := util.SingleSlashJoin(restBaseURL, "admin/v2/brokers/"+clusterName)
28+
newRequest, err := http.NewRequest(http.MethodGet, brokersURL, nil)
29+
if err != nil {
30+
return nil, err
31+
}
32+
newRequest.Header.Add("user-agent", "pulsar-monitor")
33+
newRequest.Header.Add("Authorization", "Bearer "+token)
34+
client := &http.Client{
35+
CheckRedirect: util.PreserveHeaderForRedirect,
36+
}
37+
resp, err := client.Do(newRequest)
38+
if resp != nil {
39+
defer resp.Body.Close()
40+
}
41+
if err != nil {
42+
return nil, err
43+
}
3744

38-
// Stats is the json response object for REST API
39-
type Stats struct {
40-
Total int `json:"total"`
41-
Offset int `json:"offset"`
42-
Data []BrokerStats `json:"data"`
43-
}
45+
if resp.StatusCode > 300 {
46+
return nil, fmt.Errorf("failed to get a list of brokers, returns incorrect status code %d", resp.StatusCode)
47+
}
4448

45-
// TopicStats is the usage for topic on each individual broker
46-
type TopicStats struct {
47-
ID string `json:"id"` // ID is the topic fullname
48-
Tenant string `json:"tenant"`
49-
Namespace string `json:"namespace"`
50-
Topic string `json:"topic"`
51-
Data interface{} `json:"data"`
52-
UpdatedAt time.Time `json:"updatedAt"`
49+
bodyBytes, err := ioutil.ReadAll(resp.Body)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
brokers := []string{}
55+
err = json.Unmarshal(bodyBytes, &brokers)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
return brokers, nil
5361
}
5462

55-
// brokersTopicsQuery returns a map of broker and topic full name, and error of this operation
56-
func brokersTopicsQuery(urlString, token string) (map[string][]string, error) {
57-
// key is tenant, value is partition topic name
58-
var brokerTopicMap = make(map[string][]string)
63+
// required configuration cluster name and broker url
64+
// 1. get a list of broker ips
65+
// 2. query each broker individually
66+
//
5967

60-
if !strings.HasPrefix(urlString, "http") {
61-
urlString = "http://" + urlString
68+
// BrokerTopicsQuery returns a map of broker and topic full name, or error of this operation
69+
func BrokerTopicsQuery(brokerBaseURL, token string) ([]string, error) {
70+
// key is tenant, value is partition topic name
71+
if !strings.HasPrefix(brokerBaseURL, "http") {
72+
brokerBaseURL = "http://" + brokerBaseURL
6273
}
63-
topicStatsURL := util.SingleSlashJoin(urlString, "admin/v2/broker-stats/topics")
74+
topicStatsURL := util.SingleSlashJoin(brokerBaseURL, "admin/v2/broker-stats/topics")
6475
statsLog.Debugf(" proxy request route is %s\n", topicStatsURL)
6576

6677
newRequest, err := http.NewRequest(http.MethodGet, topicStatsURL, nil)
6778
if err != nil {
68-
statsLog.Errorf("make http request %s error %v", topicStatsURL, err)
69-
return brokerTopicMap, err
79+
return nil, err
7080
}
7181
newRequest.Header.Add("user-agent", "pulsar-monitor")
7282
newRequest.Header.Add("Authorization", "Bearer "+token)
@@ -78,52 +88,46 @@ func brokersTopicsQuery(urlString, token string) (map[string][]string, error) {
7888
defer response.Body.Close()
7989
}
8090
if err != nil {
81-
statsLog.Errorf("make http request %s error %v", topicStatsURL, err)
82-
return brokerTopicMap, err
91+
return nil, err
8392
}
8493

8594
if response.StatusCode != http.StatusOK {
86-
statsLog.Errorf("GET broker topic stats %s response status code %d", topicStatsURL, response.StatusCode)
87-
return brokerTopicMap, err
95+
return nil, err
8896
}
8997

9098
body, err := ioutil.ReadAll(response.Body)
9199
if err != nil {
92100
statsLog.Errorf("GET broker topic stats request %s error %v", topicStatsURL, err)
93-
return brokerTopicMap, err
101+
return nil, err
94102
}
95103

96-
var brokers Stats
97-
if err = json.Unmarshal(body, &brokers); err != nil {
98-
statsLog.Errorf("GET broker topic stats request %s unmarshal error %v", topicStatsURL, err)
99-
return brokerTopicMap, err
104+
var namespaces map[string]map[string]map[string]map[string]interface{}
105+
if err = json.Unmarshal(body, &namespaces); err != nil {
106+
return nil, err
100107
}
101108

102-
for _, broker := range brokers.Data {
103-
var topics []string
104-
for k, v := range broker.Data {
105-
tenant := strings.Split(k, "/")[0]
106-
statsLog.Debugf("namespace %s tenant %s", k, tenant)
109+
topics := []string{}
110+
for k, v := range namespaces {
111+
tenant := strings.Split(k, "/")[0]
112+
statsLog.Debugf("namespace %s tenant %s", k, tenant)
107113

108-
for bundleKey, v2 := range v {
109-
statsLog.Debugf(" bundle %s", bundleKey)
110-
for persistentKey, v3 := range v2 {
111-
statsLog.Debugf(" %s key", persistentKey)
114+
for bundleKey, v2 := range v {
115+
statsLog.Debugf(" bundle %s", bundleKey)
116+
for persistentKey, v3 := range v2 {
117+
statsLog.Debugf(" %s key", persistentKey)
112118

113-
for topicFn := range v3 {
114-
// statsLog.Infof(" topic name %s", topicFn)
115-
topics = append(topics, topicFn)
116-
}
119+
for topicFn := range v3 {
120+
// statsLog.Infof(" topic name %s", topicFn)
121+
topics = append(topics, topicFn)
117122
}
118123
}
119124
}
120-
brokerTopicMap[broker.Broker] = topics
121125
}
122-
123-
return brokerTopicMap, nil
126+
return topics, nil
124127
}
125128

126-
func brokerHealthCheck(broker, token string) error {
129+
// BrokerHealthCheck calls broker's health endpoint
130+
func BrokerHealthCheck(broker, token string) error {
127131
// key is tenant, value is partition topic name
128132
if !strings.HasPrefix(broker, "http") {
129133
broker = "http://" + broker
@@ -192,17 +196,28 @@ func QueryTopicStats(url, token string) error {
192196
}
193197

194198
// TestBrokers evaluates all brokers' health
195-
func TestBrokers(urlPrefix, token string) (int, error) {
196-
brokerTopics, err := brokersTopicsQuery(urlPrefix, token)
199+
func TestBrokers(urlPrefix, clusterName, token string) (int, error) {
200+
brokers, err := GetBrokers(urlPrefix, clusterName, token)
197201
if err != nil {
198202
return 0, err
199203
}
200204

205+
log.Debugf("got a list of brokers %v", brokers)
201206
failedBrokers := 0
202-
errorStr := ""
203-
for brokerName, topics := range brokerTopics {
204-
if err := brokerHealthCheck(brokerName, token); err != nil {
205-
errorStr = errorStr + ";;" + err.Error()
207+
errStr := ""
208+
for _, brokerURL := range brokers {
209+
// check the broker health
210+
if err := BrokerHealthCheck(brokerURL, token); err != nil {
211+
errStr = errStr + ";;" + err.Error()
212+
failedBrokers++
213+
continue
214+
}
215+
log.Debugf("broker %s health ok", brokerURL)
216+
217+
// Get broker topic stats
218+
topics, err := BrokerTopicsQuery(brokerURL, token)
219+
if err != nil {
220+
errStr = errStr + ";;" + err.Error()
206221
failedBrokers++
207222
continue
208223
}
@@ -221,7 +236,7 @@ func TestBrokers(urlPrefix, token string) (int, error) {
221236
url = util.SingleSlashJoin(util.SingleSlashJoin(urlPrefix, "/admin/v2/"), url+"/stats")
222237
err = QueryTopicStats(url, token)
223238
if err != nil {
224-
errorStr = errorStr + ";;" + err.Error()
239+
errStr = errStr + ";;" + err.Error()
225240
failureCount++
226241
}
227242
count++
@@ -234,11 +249,12 @@ func TestBrokers(urlPrefix, token string) (int, error) {
234249
break
235250
}
236251
}
237-
statsLog.Infof("broker %s health monitor required %d topic stats test, failed %d test", brokerName, count, failureCount)
252+
statsLog.Infof("broker %s health monitor required %d topic stats test, failed %d test", brokerURL, count, failureCount)
238253
}
239-
statsLog.Infof("failed %d brokers out of total %d brokers", failedBrokers, len(brokerTopics))
240-
if errorStr != "" {
241-
return failedBrokers, errors.Errorf(errorStr)
254+
255+
statsLog.Infof("cluster %s has %d failed brokers out of total %d brokers", clusterName, failedBrokers, len(brokers))
256+
if errStr != "" {
257+
return failedBrokers, errors.Errorf(errStr)
242258
}
243259

244260
return failedBrokers, nil

src/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ type TenantUsageCfg struct {
129129

130130
// Configuration - this server's configuration
131131
type Configuration struct {
132-
Name string `json:"name"`
132+
// Name is the Pulsar cluster name, it is mandatory
133+
Name string `json:"name"`
134+
// ClusterName is the Pulsar cluster name if the Name cannot be used as the Pulsar cluster name, optional
135+
ClusterName string `json:"clusterName"`
136+
// Token is a Pulsar JWT can be used for both client client or http admin client
133137
Token string `json:"token"`
134138
BrokersConfig BrokersCfg `json:"brokersConfig"`
135139
TrustStore string `json:"trustStore"`

src/heartbeat.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
"time"
99

1010
"github.com/hashicorp/go-retryablehttp"
11-
"github.com/kafkaesque-io/pulsar-monitor/src/util"
1211
)
1312

1413
// StartHeartBeat starts heartbeat monitoring the program by OpsGenie
1514
func StartHeartBeat() {
16-
genieURL := util.AssignString(GetConfig().OpsGenieConfig.HeartBeatURL, "https://api.opsgenie.com/v2/heartbeats/latency-monitor/ping")
15+
// opsgenie url in the format of "https://api.opsgenie.com/v2/heartbeats/<component>/ping"
16+
genieURL := GetConfig().OpsGenieConfig.HeartBeatURL
1717
genieKey := GetConfig().OpsGenieConfig.HeartbeatKey
18-
if genieKey != "" {
18+
if genieURL != "" {
1919
err := HeartBeatToOpsGenie(genieURL, genieKey)
2020
if err != nil {
2121
Alert(fmt.Sprintf("OpsGenie error %v", err))

src/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ func main() {
5151
TopicLatencyTestThread()
5252
WebSocketTopicLatencyTestThread()
5353
PushToPrometheusProxyThread()
54-
BuildTenantsUsageThread()
54+
// Disable tenant usage metering, this is not a monitoring function
55+
// BuildTenantsUsageThread()
5556

5657
if cfg.PrometheusConfig.ExposeMetrics {
5758
log.Printf("start to listen to http port %s", cfg.PrometheusConfig.Port)

src/slack.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package main
33
import (
44
"bytes"
55
"encoding/json"
6-
"errors"
7-
"log"
6+
"fmt"
87
"net/http"
98
"time"
9+
10+
"github.com/apex/log"
1011
)
1112

1213
// SlackMessage is the message struct to be posted for Slack
@@ -19,12 +20,15 @@ type SlackMessage struct {
1920

2021
// Alert alerts to slack, email, text.
2122
func Alert(msg string) {
22-
log.Println("error ", msg)
23+
log.Errorf("Alert %s", msg)
24+
if GetConfig().SlackConfig.AlertURL == "" {
25+
return
26+
}
2327
err := SendSlackNotification(GetConfig().SlackConfig.AlertURL, SlackMessage{
2428
Text: msg,
2529
})
2630
if err != nil {
27-
log.Println("slack error ", err)
31+
log.Errorf("slack error %v", err)
2832
}
2933
}
3034

@@ -51,8 +55,7 @@ func SendSlackNotification(webhookURL string, msg SlackMessage) error {
5155
buf := new(bytes.Buffer)
5256
buf.ReadFrom(resp.Body)
5357
if buf.String() != "ok" {
54-
log.Println(buf.String())
55-
return errors.New("Non-ok response returned from Slack")
58+
return fmt.Errorf("Non-ok response returned from Slack, message %s", buf.String())
5659
}
5760
return nil
5861
}

0 commit comments

Comments
 (0)