Skip to content

Commit 319e088

Browse files
destelsiddontang
authored andcommitted
Fixes and improvements for replication (go-mysql-org#342)
* Properly parsing empty binary values from mysqldump output
1 parent adcdf75 commit 319e088

File tree

6 files changed

+81
-38
lines changed

6 files changed

+81
-38
lines changed

canal/canal.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ func (c *Canal) run() error {
200200
c.cancel()
201201
}()
202202

203+
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
204+
203205
if !c.dumped {
204206
c.dumped = true
205207

@@ -396,21 +398,9 @@ func (c *Canal) checkBinlogRowFormat() error {
396398
}
397399

398400
func (c *Canal) prepareSyncer() error {
399-
seps := strings.Split(c.cfg.Addr, ":")
400-
if len(seps) != 2 {
401-
return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
402-
}
403-
404-
port, err := strconv.ParseUint(seps[1], 10, 16)
405-
if err != nil {
406-
return errors.Trace(err)
407-
}
408-
409401
cfg := replication.BinlogSyncerConfig{
410402
ServerID: c.cfg.ServerID,
411403
Flavor: c.cfg.Flavor,
412-
Host: seps[0],
413-
Port: uint16(port),
414404
User: c.cfg.User,
415405
Password: c.cfg.Password,
416406
Charset: c.cfg.Charset,
@@ -421,6 +411,23 @@ func (c *Canal) prepareSyncer() error {
421411
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
422412
}
423413

414+
if strings.Contains(c.cfg.Addr, "/") {
415+
cfg.Host = c.cfg.Addr
416+
} else {
417+
seps := strings.Split(c.cfg.Addr, ":")
418+
if len(seps) != 2 {
419+
return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
420+
}
421+
422+
port, err := strconv.ParseUint(seps[1], 10, 16)
423+
if err != nil {
424+
return errors.Trace(err)
425+
}
426+
427+
cfg.Host = seps[0]
428+
cfg.Port = uint16(port)
429+
}
430+
424431
c.syncer = replication.NewBinlogSyncer(cfg)
425432

426433
return nil
@@ -458,6 +465,10 @@ func (c *Canal) SyncedPosition() mysql.Position {
458465
return c.master.Position()
459466
}
460467

468+
func (c *Canal) SyncedTimestamp() uint32 {
469+
return c.master.timestamp
470+
}
471+
461472
func (c *Canal) SyncedGTIDSet() mysql.GTIDSet {
462473
return c.master.GTIDSet()
463474
}

canal/dump.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/juju/errors"
1111
"github.com/shopspring/decimal"
1212
"github.com/siddontang/go-log/log"
13-
"github.com/siddontang/go-mysql/dump"
1413
"github.com/siddontang/go-mysql/mysql"
1514
"github.com/siddontang/go-mysql/schema"
1615
)
@@ -50,47 +49,43 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
5049
for i, v := range values {
5150
if v == "NULL" {
5251
vs[i] = nil
52+
} else if v == "_binary ''" {
53+
vs[i] = []byte{}
5354
} else if v[0] != '\'' {
5455
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
5556
n, err := strconv.ParseInt(v, 10, 64)
5657
if err != nil {
57-
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
58-
return dump.ErrSkip
58+
return fmt.Errorf("parse row %v at %d error %v, int expected", values, i, err)
5959
}
6060
vs[i] = n
6161
} else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
6262
f, err := strconv.ParseFloat(v, 64)
6363
if err != nil {
64-
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
65-
return dump.ErrSkip
64+
return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err)
6665
}
6766
vs[i] = f
6867
} else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL {
6968
if h.c.cfg.UseDecimal {
7069
d, err := decimal.NewFromString(v)
7170
if err != nil {
72-
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
73-
return dump.ErrSkip
71+
return fmt.Errorf("parse row %v at %d error %v, decimal expected", values, i, err)
7472
}
7573
vs[i] = d
7674
} else {
7775
f, err := strconv.ParseFloat(v, 64)
7876
if err != nil {
79-
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
80-
return dump.ErrSkip
77+
return fmt.Errorf("parse row %v at %d error %v, float expected", values, i, err)
8178
}
8279
vs[i] = f
8380
}
8481
} else if strings.HasPrefix(v, "0x") {
8582
buf, err := hex.DecodeString(v[2:])
8683
if err != nil {
87-
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
88-
return dump.ErrSkip
84+
return fmt.Errorf("parse row %v at %d error %v, hex literal expected", values, i, err)
8985
}
9086
vs[i] = string(buf)
9187
} else {
92-
log.Errorf("parse row %v error, invalid type at %d, skip", values, i)
93-
return dump.ErrSkip
88+
return fmt.Errorf("parse row %v error, invalid type at %d", values, i)
9489
}
9590
} else {
9691
vs[i] = v[1 : len(v)-1]
@@ -130,6 +125,8 @@ func (c *Canal) dump() error {
130125
return errors.New("mysqldump does not exist")
131126
}
132127

128+
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
129+
133130
h := &dumpParseHandler{c: c}
134131
// If users call StartFromGTID with empty position to start dumping with gtid,
135132
// we record the current gtid position before dump starts.
@@ -161,7 +158,9 @@ func (c *Canal) dump() error {
161158

162159
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
163160
c.master.Update(pos)
164-
c.eventHandler.OnPosSynced(pos, true)
161+
if err := c.eventHandler.OnPosSynced(pos, true); err != nil {
162+
return errors.Trace(err)
163+
}
165164
var startPos fmt.Stringer = pos
166165
if h.gset != nil {
167166
c.master.UpdateGTIDSet(h.gset)

canal/master.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ type masterInfo struct {
1313
pos mysql.Position
1414

1515
gset mysql.GTIDSet
16+
17+
timestamp uint32
1618
}
1719

1820
func (m *masterInfo) Update(pos mysql.Position) {
@@ -23,6 +25,14 @@ func (m *masterInfo) Update(pos mysql.Position) {
2325
m.Unlock()
2426
}
2527

28+
func (m *masterInfo) UpdateTimestamp(ts uint32) {
29+
log.Debugf("update master timestamp %s", ts)
30+
31+
m.Lock()
32+
m.timestamp = ts
33+
m.Unlock()
34+
}
35+
2636
func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) {
2737
log.Debugf("update master gtid set %s", gset)
2838

@@ -38,6 +48,13 @@ func (m *masterInfo) Position() mysql.Position {
3848
return m.pos
3949
}
4050

51+
func (m *masterInfo) Timestamp() uint32 {
52+
m.RLock()
53+
defer m.RUnlock()
54+
55+
return m.timestamp
56+
}
57+
4158
func (m *masterInfo) GTIDSet() mysql.GTIDSet {
4259
m.RLock()
4360
defer m.RUnlock()

canal/sync.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ import (
1414
)
1515

1616
var (
17-
expCreateTable = regexp.MustCompile("(?i)^CREATE\\sTABLE(\\sIF\\sNOT\\sEXISTS)?\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
18-
expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
19-
expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s{1,}TO\\s.*?")
20-
expDropTable = regexp.MustCompile("(?i)^DROP\\sTABLE(\\sIF\\sEXISTS){0,1}\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}(?:$|\\s)")
17+
expCreateTable = regexp.MustCompile("(?i)^CREATE\\sTABLE(\\sIF\\sNOT\\sEXISTS)?\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
18+
expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
19+
expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s{1,}TO\\s.*?")
20+
expDropTable = regexp.MustCompile("(?i)^DROP\\sTABLE(\\sIF\\sEXISTS){0,1}\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}(?:$|\\s)")
21+
expTruncateTable = regexp.MustCompile("(?i)^TRUNCATE\\s+(?:TABLE\\s+)?(?:`?([^`\\s]+)`?\\.`?)?([^`\\s]+)`?")
2122
)
2223

2324
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
@@ -126,7 +127,7 @@ func (c *Canal) runSyncBinlog() error {
126127
db []byte
127128
table []byte
128129
)
129-
regexps := []regexp.Regexp{*expCreateTable, *expAlterTable, *expRenameTable, *expDropTable}
130+
regexps := []regexp.Regexp{*expCreateTable, *expAlterTable, *expRenameTable, *expDropTable, *expTruncateTable}
130131
for _, reg := range regexps {
131132
mb = reg.FindSubmatch(e.Query)
132133
if len(mb) != 0 {
@@ -164,7 +165,10 @@ func (c *Canal) runSyncBinlog() error {
164165

165166
if savePos {
166167
c.master.Update(pos)
167-
c.eventHandler.OnPosSynced(pos, force)
168+
c.master.UpdateTimestamp(ev.Header.Timestamp)
169+
if err := c.eventHandler.OnPosSynced(pos, force); err != nil {
170+
return errors.Trace(err)
171+
}
168172
}
169173
}
170174

dump/dump.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,14 @@ func (d *Dumper) Dump(w io.Writer) error {
121121
args := make([]string, 0, 16)
122122

123123
// Common args
124-
seps := strings.Split(d.Addr, ":")
125-
args = append(args, fmt.Sprintf("--host=%s", seps[0]))
126-
if len(seps) > 1 {
127-
args = append(args, fmt.Sprintf("--port=%s", seps[1]))
124+
if strings.Contains(d.Addr, "/") {
125+
args = append(args, fmt.Sprintf("--socket=%s", d.Addr))
126+
} else {
127+
seps := strings.SplitN(d.Addr, ":", 2)
128+
args = append(args, fmt.Sprintf("--host=%s", seps[0]))
129+
if len(seps) > 1 {
130+
args = append(args, fmt.Sprintf("--port=%s", seps[1]))
131+
}
128132
}
129133

130134
args = append(args, fmt.Sprintf("--user=%s", d.User))

replication/binlogsyncer.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"net"
99
"os"
10+
"strings"
1011
"sync"
1112
"time"
1213

@@ -192,9 +193,16 @@ func (b *BinlogSyncer) registerSlave() error {
192193
b.c.Close()
193194
}
194195

195-
log.Infof("register slave for master server %s:%d", b.cfg.Host, b.cfg.Port)
196+
addr := ""
197+
if strings.Contains(b.cfg.Host, "/") {
198+
addr = b.cfg.Host
199+
} else {
200+
addr = fmt.Sprintf("%s:%d", b.cfg.Host, b.cfg.Port)
201+
}
202+
203+
log.Infof("register slave for master server %s", addr)
196204
var err error
197-
b.c, err = client.Connect(fmt.Sprintf("%s:%d", b.cfg.Host, b.cfg.Port), b.cfg.User, b.cfg.Password, "", func(c *client.Conn) {
205+
b.c, err = client.Connect(addr, b.cfg.User, b.cfg.Password, "", func(c *client.Conn) {
198206
c.TLSConfig = b.cfg.TLSConfig
199207
})
200208
if err != nil {

0 commit comments

Comments
 (0)