Skip to content

Commit

Permalink
refactor status with prometheus (#340)
Browse files Browse the repository at this point in the history
* refactor status with prometheus
  • Loading branch information
mxlxm authored and siddontang committed Nov 11, 2019
1 parent 7b48b8c commit d4e8812
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 94 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- "1.11"
- "1.12"

services:
- elasticsearch
Expand Down Expand Up @@ -35,4 +35,4 @@ script:
- go test --race ./...

env:
- GO111MODULE=on
- GO111MODULE=on
1 change: 1 addition & 0 deletions etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# pseudo server id like a slave
server_id = 1001
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
module github.com/siddontang/go-mysql-elasticsearch

go 1.12

require (
github.com/BurntSushi/toml v0.3.1
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/prometheus/client_golang v0.9.3
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed
github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe
)
1 change: 1 addition & 0 deletions river/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
ESPassword string `toml:"es_pass"`

StatAddr string `toml:"stat_addr"`
StatPath string `toml:"stat_path"`

ServerID uint32 `toml:"server_id"`
Flavor string `toml:"flavor"`
Expand Down
54 changes: 54 additions & 0 deletions river/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package river

import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
esInsertNum = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mysql2es_inserted_num",
Help: "The number of docs inserted to elasticsearch",
}, []string{"index"},
)
esUpdateNum = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mysql2es_updated_num",
Help: "The number of docs updated to elasticsearch",
}, []string{"index"},
)
esDeleteNum = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mysql2es_deleted_num",
Help: "The number of docs deleted from elasticsearch",
}, []string{"index"},
)
canalSyncState = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "mysql2es_canal_state",
Help: "The canal slave running state: 0=stopped, 1=ok",
},
)
canalDelay = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "mysql2es_canal_delay",
Help: "The canal slave lag",
},
)
)

func (r *River) collectMetrics() {
for range time.Tick(10 * time.Second) {
canalDelay.Set(float64(r.canal.GetDelay()))
}
}

func InitStatus(addr string, path string) {
http.Handle(path, promhttp.Handler())
http.ListenAndServe(addr, nil)
}
7 changes: 3 additions & 4 deletions river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type River struct {

es *elastic.Client

st *stat

master *masterInfo

syncCh chan interface{}
Expand Down Expand Up @@ -78,8 +76,7 @@ func NewRiver(c *Config) (*River, error) {
cfg.HTTPS = r.c.ESHttps
r.es = elastic.NewClient(cfg)

r.st = &stat{r: r}
go r.st.Run(r.c.StatAddr)
go InitStatus(r.c.StatAddr, r.c.StatPath)

return r, nil
}
Expand Down Expand Up @@ -292,11 +289,13 @@ func ruleKey(schema string, table string) string {
// Run syncs the data from MySQL and inserts to ES.
func (r *River) Run() error {
r.wg.Add(1)
canalSyncState.Set(float64(1))
go r.syncLoop()

pos := r.master.Position()
if err := r.canal.RunFrom(pos); err != nil {
log.Errorf("start canal err %v", err)
canalSyncState.Set(0)
return errors.Trace(err)
}

Expand Down
2 changes: 2 additions & 0 deletions river/river_extra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) {
cfg.DumpExec = "mysqldump"

cfg.StatAddr = "127.0.0.1:12800"
cfg.StatPath = "/metrics2"

cfg.BulkSize = 1
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}

Expand Down
2 changes: 2 additions & 0 deletions river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
cfg.DumpExec = "mysqldump"

cfg.StatAddr = "127.0.0.1:12800"
cfg.StatPath = "/metrics1"

cfg.BulkSize = 1
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}

Expand Down
74 changes: 0 additions & 74 deletions river/status.go

This file was deleted.

18 changes: 6 additions & 12 deletions river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
"github.com/siddontang/go-mysql/schema"
)

const (
syncInsertDoc = iota
syncDeleteDoc
syncUpdateDoc
)

const (
fieldTypeList = "list"
// for the mysql int type to es date type
Expand Down Expand Up @@ -103,7 +97,7 @@ func (h *eventHandler) OnGTID(gtid mysql.GTIDSet) error {
return nil
}

func (h *eventHandler) OnPosSynced(pos mysql.Position, force bool) error {
func (h *eventHandler) OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error {
return nil
}

Expand Down Expand Up @@ -197,10 +191,10 @@ func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]

if action == canal.DeleteAction {
req.Action = elastic.ActionDelete
r.st.DeleteNum.Add(1)
esDeleteNum.WithLabelValues(rule.Index).Inc()
} else {
r.makeInsertReqData(req, rule, values)
r.st.InsertNum.Add(1)
esInsertNum.WithLabelValues(rule.Index).Inc()
}

reqs = append(reqs, req)
Expand Down Expand Up @@ -255,8 +249,8 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
req = &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: afterID, Parent: afterParentID, Pipeline: rule.Pipeline}
r.makeInsertReqData(req, rule, rows[i+1])

r.st.DeleteNum.Add(1)
r.st.InsertNum.Add(1)
esDeleteNum.WithLabelValues(rule.Index).Inc()
esInsertNum.WithLabelValues(rule.Index).Inc()
} else {
if len(rule.Pipeline) > 0 {
// Pipelines can only be specified on index action
Expand All @@ -267,7 +261,7 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
} else {
r.makeUpdateReqData(req, rule, rows[i], rows[i+1])
}
r.st.UpdateNum.Add(1)
esUpdateNum.WithLabelValues(rule.Index).Inc()
}

reqs = append(reqs, req)
Expand Down

0 comments on commit d4e8812

Please sign in to comment.