Skip to content

Commit 08b9f70

Browse files
committed
Merge branch 'main' into add-ut-for-disttae-1
2 parents 7719e8a + 1cfcd1a commit 08b9f70

File tree

330 files changed

+17368
-6610
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

330 files changed

+17368
-6610
lines changed

pkg/backup/backup_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ func TestBackupData(t *testing.T) {
9393
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
9494
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)
9595

96-
obj := testutil.GetOneObject(rel)
97-
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
96+
obj := testutil.GetOneBlockMeta(rel)
97+
id := obj.AsCommonID()
9898
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
9999
require.NoError(t, err)
100100
deletedRows = 1
@@ -118,7 +118,7 @@ func TestBackupData(t *testing.T) {
118118
v := testutil.GetSingleSortKeyValue(data, schema, 2)
119119
filter := handle.NewEQFilter(v)
120120
err := rel.DeleteByFilter(context.Background(), filter)
121-
assert.NoError(t, err)
121+
assert.NoError(t, err, v)
122122
assert.NoError(t, txn.Commit(context.Background()))
123123
}
124124
backupTime := time.Now().UTC()
@@ -472,8 +472,8 @@ func TestBackupData5(t *testing.T) {
472472
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
473473
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)
474474

475-
obj := testutil.GetOneObject(rel)
476-
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
475+
obj := testutil.GetOneBlockMeta(rel)
476+
id := obj.AsCommonID()
477477
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
478478
require.NoError(t, err)
479479
deletedRows = 1

pkg/backup/tae.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ func execBackup(
293293
}()
294294
now := time.Now()
295295
baseTS := ts
296+
// When rewriting the checkpoint and trimming the aobject,
297+
// you need to collect the atombstone in the last checkpoint
298+
// Before this, only the last special checkpoint needs to be collected
299+
var lastData *logtail.CheckpointData
296300
for i, name := range names {
297301
if len(name) == 0 {
298302
continue
@@ -322,6 +326,9 @@ func execBackup(
322326
}
323327
defer data.Close()
324328
oNames = append(oNames, oneNames...)
329+
if i == len(names)-1 {
330+
lastData = data
331+
}
325332
}
326333
loadDuration += time.Since(now)
327334

@@ -402,7 +409,7 @@ func execBackup(
402409
tnLocation objectio.Location
403410
)
404411
cnLocation, tnLocation, checkpointFiles, err = logtail.ReWriteCheckpointAndBlockFromKey(ctx, sid, srcFs, dstFs,
405-
cnLocation, uint32(version), start)
412+
cnLocation, lastData, uint32(version), start)
406413
for _, name := range checkpointFiles {
407414
dentry, err := dstFs.StatFile(ctx, name)
408415
if err != nil {

pkg/cdc/sinker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,11 @@ func (s *mysqlSinker) appendSqlBuf(rowType RowType) (err error) {
566566
// if s.sqlBuf has no enough space
567567
if len(s.sqlBuf)+len(s.rowBuf)+suffixLen > cap(s.sqlBuf) {
568568
// complete sql statement
569-
if rowType == InsertRow {
569+
if s.isNonEmptyInsertStmt() {
570570
s.sqlBuf = appendString(s.sqlBuf, ";")
571571
s.preSqlBufLen = len(s.sqlBuf)
572-
} else {
572+
}
573+
if s.isNonEmptyDeleteStmt() {
573574
s.sqlBuf = appendString(s.sqlBuf, ");")
574575
s.preSqlBufLen = len(s.sqlBuf)
575576
}

pkg/cdc/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
TableLevel = "table"
3939
DbLevel = "database"
4040
AccountLevel = "account"
41+
ClusterLevel = "cluster"
4142
MatchAll = "*"
4243

4344
MysqlSink = "mysql"

pkg/cnservice/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,7 @@ func (s *service) initProcessCodecService() {
10371037
s._txnClient,
10381038
s.fileService,
10391039
s.lockService,
1040+
s.partitionService,
10401041
s.queryClient,
10411042
s._hakeeperClient,
10421043
s.udfService,

pkg/common/moerr/error.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,14 @@ const (
220220
ErrTxnCannotRetry uint16 = 20630
221221
ErrTxnNeedRetryWithDefChanged uint16 = 20631
222222
ErrTxnStale uint16 = 20632
223-
ErrRetryForCNRollingRestart uint16 = 20633
224-
ErrNewTxnInCNRollingRestart uint16 = 20634
225-
ErrPrevCheckpointNotFinished uint16 = 20635
226-
ErrCantDelGCChecker uint16 = 20636
227-
ErrTxnUnknown uint16 = 20637
228-
ErrTxnControl uint16 = 20638
223+
// ErrRetryForCNRollingRestart rolling upgrade related, do not modify
224+
ErrRetryForCNRollingRestart uint16 = 20634
225+
// ErrNewTxnInCNRollingRestart rolling upgrade related, do not modify
226+
ErrNewTxnInCNRollingRestart uint16 = 20635
227+
ErrPrevCheckpointNotFinished uint16 = 20636
228+
ErrCantDelGCChecker uint16 = 20637
229+
ErrTxnUnknown uint16 = 20638
230+
ErrTxnControl uint16 = 20639
229231

230232
// Group 7: lock service
231233
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
@@ -569,7 +571,7 @@ type Error struct {
569571
}
570572

571573
func (e *Error) Error() string {
572-
return e.message
574+
return e.Display()
573575
}
574576

575577
func (e *Error) Detail() string {
@@ -595,6 +597,10 @@ func (e *Error) SqlState() string {
595597
return e.sqlState
596598
}
597599

600+
func (e *Error) SetDetail(detail string) {
601+
e.detail = detail
602+
}
603+
598604
var _ encoding.BinaryMarshaler = new(Error)
599605

600606
func (e *Error) MarshalBinary() ([]byte, error) {
@@ -638,6 +644,40 @@ func IsMoErrCode(e error, rc uint16) bool {
638644
return me.code == rc
639645
}
640646

647+
func GetMoErrCode(e error) (uint16, bool) {
648+
if e == nil {
649+
return 0, false
650+
}
651+
652+
me, ok := e.(*Error)
653+
if !ok {
654+
return 0, false
655+
}
656+
657+
return me.code, true
658+
}
659+
660+
func IsSameMoErr(a error, b error) bool {
661+
if a == nil || b == nil {
662+
return false
663+
}
664+
665+
var (
666+
ok bool
667+
aa, bb *Error
668+
)
669+
670+
if aa, ok = a.(*Error); !ok {
671+
return false
672+
}
673+
674+
if bb, ok = b.(*Error); !ok {
675+
return false
676+
}
677+
678+
return aa.code == bb.code
679+
}
680+
641681
func DowncastError(e error) *Error {
642682
if err, ok := e.(*Error); ok {
643683
return err

pkg/common/moerr/error_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,53 @@ func TestEncoding(t *testing.T) {
110110
require.Nil(t, err)
111111
require.Equal(t, e, e2)
112112
}
113+
114+
type fakeErr struct {
115+
}
116+
117+
func (f *fakeErr) Error() string {
118+
return "fake error"
119+
}
120+
121+
func TestIsSameMoErr(t *testing.T) {
122+
var a, b error
123+
require.False(t, IsSameMoErr(a, b))
124+
125+
_, ok := GetMoErrCode(a)
126+
require.False(t, ok)
127+
128+
_, ok = GetMoErrCode(b)
129+
require.False(t, ok)
130+
131+
a = &fakeErr{}
132+
require.False(t, IsSameMoErr(a, b))
133+
134+
_, ok = GetMoErrCode(a)
135+
require.False(t, ok)
136+
137+
b = &fakeErr{}
138+
require.False(t, IsSameMoErr(a, b))
139+
140+
_, ok = GetMoErrCode(b)
141+
require.False(t, ok)
142+
143+
a = GetOkExpectedEOB()
144+
require.False(t, IsSameMoErr(a, b))
145+
146+
code, ok := GetMoErrCode(a)
147+
require.True(t, ok)
148+
require.Equal(t, OkExpectedEOB, code)
149+
150+
b = GetOkExpectedDup()
151+
require.False(t, IsSameMoErr(a, b))
152+
153+
code, ok = GetMoErrCode(b)
154+
require.True(t, ok)
155+
require.Equal(t, OkExpectedDup, code)
156+
157+
b = nil
158+
require.False(t, IsSameMoErr(a, b))
159+
160+
b = GetOkExpectedEOB()
161+
require.True(t, IsSameMoErr(a, b))
162+
}

pkg/common/morpc/backend_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -808,21 +808,22 @@ func TestCannotBusyLoopIfWriteCIsFull(t *testing.T) {
808808
return conn.Write(msg, goetty.WriteOptions{Flush: true})
809809
},
810810
func(b *remoteBackend) {
811-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
812-
defer cancel()
813-
814811
var wg sync.WaitGroup
815812
for i := 0; i < 10; i++ {
816813
wg.Add(1)
817814
go func() {
818815
defer wg.Done()
819816
for i := 0; i < 10; i++ {
820-
req := newTestMessage(1)
821-
f, err := b.Send(ctx, req)
822-
if err == nil { //ignore timeout
823-
_, err = f.Get()
824-
assert.NoError(t, err)
825-
}
817+
func() {
818+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
819+
defer cancel()
820+
req := newTestMessage(0)
821+
f, err := b.Send(ctx, req)
822+
if err == nil { //ignore timeout
823+
_, _ = f.Get()
824+
f.Close()
825+
}
826+
}()
826827
}
827828
}()
828829
}

pkg/container/batch/batch.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21-
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
22-
23-
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
2421

22+
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
2523
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2624
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2725
"github.com/matrixorigin/matrixone/pkg/container/types"
2826
"github.com/matrixorigin/matrixone/pkg/container/vector"
27+
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
2928
)
3029

3130
func New(attrs []string) *Batch {

pkg/container/nulls/nulls.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
)
2727

2828
type Bitmap = Nulls
29-
type Grouping = Nulls
3029

3130
type Nulls struct {
3231
np bitmap.Bitmap

pkg/container/vector/search.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ func CollectOffsetsByBetweenFactory[T types.BuiltinNumber | types.Times | types.
644644
//return cols[i] > rval
645645
return cmpRight(cols[i], rval)
646646
})
647-
if start == end {
647+
if start >= end {
648648
return nil
649649
}
650650
sels := make([]int64, end-start)
@@ -695,7 +695,7 @@ func CollectOffsetsByBetweenString(lval, rval string, hint int) func(*Vector) []
695695
//return cols[i] > rval
696696
return cmpRight(col[i].UnsafeGetString(area), rval)
697697
})
698-
if start == end {
698+
if start >= end {
699699
return nil
700700
}
701701
sels := make([]int64, end-start)

pkg/container/vector/vector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ type Vector struct {
5555
capacity int
5656
length int
5757

58-
nsp nulls.Nulls // nulls list
59-
gsp nulls.Grouping // grouping list
58+
nsp nulls.Nulls // nulls list
59+
gsp nulls.Nulls // grouping list
6060

6161
cantFreeData bool
6262
cantFreeArea bool

pkg/datasync/consumer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func TestConsumeEntries(t *testing.T) {
551551
err := c.consumeEntries(ctx, []logservice.LogRecord{
552552
{
553553
Lsn: 10,
554-
Data: make([]byte, 100),
554+
Data: dataWithValidVersion(make([]byte, 100)),
555555
},
556556
}, false)
557557
assert.NoError(t, err)
@@ -573,7 +573,7 @@ func TestConsumeEntries(t *testing.T) {
573573
err := c.consumeEntries(ctx, []logservice.LogRecord{
574574
{
575575
Lsn: 10,
576-
Data: make([]byte, 100),
576+
Data: dataWithValidVersion(make([]byte, 100)),
577577
},
578578
}, true)
579579
assert.Error(t, err)

pkg/datasync/entry.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package datasync
1616

1717
import (
18-
"bytes"
1918
"strings"
2019

2120
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -51,31 +50,19 @@ func getLocations(rec logservice.LogRecord, tag string) []string {
5150
logutil.Errorf("invalid data size %d", len(data))
5251
return nil
5352
}
54-
buffer := bytes.NewBuffer(data[dataHeaderSize:])
55-
m := &logservicedriver.Meta{}
56-
_, err := m.ReadFrom(buffer)
57-
if err != nil {
58-
logutil.Errorf("failed to read data from buffer: %v", err)
59-
return nil
60-
}
61-
if m.GetType() != logservicedriver.TNormal {
62-
return nil
63-
}
6453
var locations []string
65-
for range m.GetAddr() {
66-
e := entry.NewEmptyEntry()
67-
_, err := e.ReadFrom(buffer)
68-
if err != nil {
69-
logutil.Errorf("failed to read data from buffer: %v", err)
70-
return nil
71-
}
72-
ei := e.Entry.GetInfo().(*entry2.Info)
73-
payload := e.Entry.GetPayload()
54+
_, err := logservicedriver.DecodeLogEntry(data[headerSize+replicaIDSize:], func(en *entry.Entry) {
55+
ei := en.Entry.GetInfo().(*entry2.Info)
56+
payload := en.Entry.GetPayload()
7457
if ei.Group == wal.GroupPrepare {
7558
locations = append(locations, parseCommonFiles(payload, tag)...)
7659
} else if ei.Group == store.GroupFiles {
7760
locations = append(locations, parseMetaFiles(payload, tag)...)
7861
}
62+
})
63+
if err != nil {
64+
logutil.Errorf("decode logentry error %s", err.Error())
65+
return nil
7966
}
8067
return locations
8168
}

0 commit comments

Comments
 (0)