Skip to content

Commit fb0f6a6

Browse files
authored
clean some code in the workspace. (#21725)
Clean some code in the workspace. 1. gen new row id 2. deletion type. Approved by: @aunjgr, @XuPeng-SH, @ouyuanning
1 parent d447fcf commit fb0f6a6

File tree

15 files changed

+457
-178
lines changed

15 files changed

+457
-178
lines changed

pkg/container/batch/batch.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,19 @@ func (bat *Batch) SetAttributes(attrs []string) {
362362
bat.Attrs = attrs
363363
}
364364

365+
func (bat *Batch) InsertVector(
366+
pos int32,
367+
attr string,
368+
vec *vector.Vector,
369+
) {
370+
bat.Vecs = append(bat.Vecs, nil)
371+
copy(bat.Vecs[pos+1:], bat.Vecs[pos:])
372+
bat.Vecs[pos] = vec
373+
bat.Attrs = append(bat.Attrs, "")
374+
copy(bat.Attrs[pos+1:], bat.Attrs[pos:])
375+
bat.Attrs[pos] = attr
376+
}
377+
365378
func (bat *Batch) SetVector(pos int32, vec *vector.Vector) {
366379
bat.Vecs[pos] = vec
367380
if vec != nil {

pkg/container/types/rowid.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ import (
1818
"bytes"
1919
"encoding/hex"
2020
"fmt"
21+
"math"
2122
"unsafe"
2223

2324
"github.com/google/uuid"
25+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2426
"github.com/matrixorigin/matrixone/pkg/common/util"
2527
)
2628

@@ -198,6 +200,18 @@ func (r *Rowid) SetRowOffset(offset uint32) {
198200
copy(r[BlockidSize:], EncodeUint32(&offset))
199201
}
200202

203+
func (r *Rowid) SetBlkOffset(offset uint16) {
204+
copy(r[ObjectidSize:], EncodeUint16(&offset))
205+
}
206+
207+
func (r *Rowid) SetObjOffset(offset uint16) {
208+
copy(r[SegmentidSize:], EncodeUint16(&offset))
209+
}
210+
211+
func (r *Rowid) SetSegment(seg Segmentid) {
212+
copy(r[:SegmentidSize], seg[:])
213+
}
214+
201215
func (r *Rowid) GetRowOffset() uint32 {
202216
return DecodeUint32(r[BlockidSize:])
203217
}
@@ -206,6 +220,10 @@ func (r *Rowid) GetBlockOffset() uint16 {
206220
return DecodeUint16(r[ObjectBytesSize:BlockidSize])
207221
}
208222

223+
func (r *Rowid) GetObjectOffset() uint16 {
224+
return DecodeUint16(r[SegmentidSize:ObjectBytesSize])
225+
}
226+
209227
func (r *Rowid) GetObjectString() string {
210228
uuid := (*uuid.UUID)(r[:UuidSize])
211229
s := DecodeUint16(r[UuidSize:ObjectBytesSize])
@@ -224,6 +242,38 @@ func (r *Rowid) ShortStringEx() string {
224242
return fmt.Sprintf("%s-%d", b.ShortStringEx(), s)
225243
}
226244

245+
func (r *Rowid) IncrBlk() error {
246+
blkOffset := r.GetBlockOffset()
247+
if blkOffset == math.MaxUint16 {
248+
if err := r.IncrObj(); err != nil {
249+
return err
250+
}
251+
blkOffset = 0
252+
} else {
253+
blkOffset++
254+
}
255+
256+
r.SetBlkOffset(blkOffset)
257+
r.SetRowOffset(0)
258+
259+
return nil
260+
}
261+
262+
func (r *Rowid) IncrObj() error {
263+
objOffset := r.GetObjectOffset()
264+
if objOffset == math.MaxUint16 {
265+
// we expect that the segment id of a rowId is immutable, so incr the segment
266+
// cannot fix the object overflow.
267+
return moerr.NewInternalErrorNoCtxf("rowId object offset overflow, curr rowId: %s", r.String())
268+
}
269+
270+
objOffset++
271+
r.SetObjOffset(objOffset)
272+
r.SetBlkOffset(0)
273+
r.SetRowOffset(0)
274+
return nil
275+
}
276+
227277
func (b *Blockid) EQ(than *Blockid) bool {
228278
return b.Compare(than) == 0
229279
}

pkg/container/types/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package types
1616

1717
import (
18+
"bytes"
1819
"encoding/binary"
1920
"fmt"
2021

@@ -257,6 +258,10 @@ type Rowid [RowidSize]byte
257258
// Segmentid
258259
type Segmentid = Uuid
259260

261+
func (d *Segmentid) EQ(oth *Segmentid) bool {
262+
return bytes.Equal(d[:], oth[:])
263+
}
264+
260265
// Objectid
261266
type Objectid [ObjectidSize]byte
262267

pkg/sql/colexec/cn_segment_map.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package colexec
16+
17+
import (
18+
"sync"
19+
20+
"github.com/matrixorigin/matrixone/pkg/container/types"
21+
"github.com/matrixorigin/matrixone/pkg/objectio"
22+
)
23+
24+
const (
25+
TxnWorkspaceUnCommitType = 1
26+
)
27+
28+
var (
29+
// TxnWorkspaceSegment includes a dedicated header which indicates it is a workspace segment.
30+
TxnWorkspaceSegment = types.Segmentid([16]byte{
31+
0x00, 0x00, 0x00, 0x00,
32+
0x00, 0x00, 0x00, 0x00,
33+
0x00, 0x00, 0x00, 0x00,
34+
0x00, 0x00, 0x00, 0xFF,
35+
})
36+
)
37+
38+
type CnSegmentMap struct {
39+
sync.Mutex
40+
// tag whether a segment is generated by this txn
41+
mp map[objectio.Segmentid]int32
42+
}
43+
44+
func IsDeletionOnTxnUnCommit(
45+
segmentMap map[string]int32,
46+
segId *types.Segmentid,
47+
) bool {
48+
// raw row id generated by txn.
49+
// WriteBatch()
50+
if segId.EQ(&TxnWorkspaceSegment) {
51+
return true
52+
}
53+
54+
// check if it is a persisted segment generated by a workspace.
55+
if segmentMap != nil {
56+
return segmentMap[string(segId[:])] == TxnWorkspaceUnCommitType
57+
} else {
58+
ss := Get()
59+
if ss == nil {
60+
return false
61+
}
62+
63+
return ss.GetCnSegmentType(segId) == TxnWorkspaceUnCommitType
64+
}
65+
}
66+
67+
// IsDeletionOnTxnUnCommitPersisted check if the segId
68+
// is an UnCommit and flushed segment in the workspace.
69+
// if the input map is nil, this check will be done by the colexec.Get.
70+
func IsDeletionOnTxnUnCommitPersisted(
71+
segmentMap map[string]int32,
72+
segId *types.Segmentid,
73+
) bool {
74+
if IsDeletionOnTxnUnCommit(segmentMap, segId) {
75+
return !segId.EQ(&TxnWorkspaceSegment)
76+
}
77+
return false
78+
}
79+
80+
// IsDeletionOnTxnUnCommitInMem check if the segId
81+
// is an UnCommit inmem segment in the workspace.
82+
// if the input map is nil, this check will be done by the colexec.Get.
83+
func IsDeletionOnTxnUnCommitInMem(
84+
segmentMap map[string]int32,
85+
segId *types.Segmentid,
86+
) bool {
87+
if IsDeletionOnTxnUnCommit(segmentMap, segId) {
88+
return segId.EQ(&TxnWorkspaceSegment)
89+
}
90+
return false
91+
}
92+
93+
func RecordTxnUnCommitSegment(segId *types.Segmentid) {
94+
Get().PutCnSegment(segId, TxnWorkspaceUnCommitType)
95+
}

pkg/sql/colexec/deletion/deletion.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,11 @@ import (
3131
"github.com/matrixorigin/matrixone/pkg/vm/process"
3232
)
3333

34-
//row id be divided into four types:
35-
// 1. RawBatchOffset : belong to txn's workspace
36-
// 2. CNBlockOffset : belong to txn's workspace
37-
38-
// 3. RawRowIdBatch : belong to txn's snapshot data.
39-
// 4. FlushDeltaLoc : belong to txn's snapshot data, which on S3 and pointed by delta location.
4034
const (
41-
RawRowIdBatch = iota
42-
// remember that, for one block,
43-
// when it sends the info to mergedeletes,
44-
// either it's Compaction or not.
45-
Compaction
46-
CNBlockOffset
47-
RawBatchOffset
48-
FlushDeltaLoc
35+
FlushDeltaLoc = iota
36+
37+
DeletionOnTxnUnCommit
38+
DeletionOnCommitted
4939
)
5040

5141
const opName = "deletion"

pkg/sql/colexec/deletion/types.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (ctr *container) flush(proc *process.Process, analyzer process.Analyzer) (u
236236
blkids := make([]types.Blockid, 0, len(blockId_rowIdBatch))
237237
for blkid := range blockId_rowIdBatch {
238238
//Don't flush rowids belong to uncommitted cn block and raw data batch in txn's workspace.
239-
if ctr.blockId_type[blkid] != RawRowIdBatch {
239+
if ctr.blockId_type[blkid] == DeletionOnTxnUnCommit {
240240
continue
241241
}
242242
blkids = append(blkids, blkid)
@@ -329,12 +329,10 @@ func collectBatchInfo(proc *process.Process, deletion *Deletion, destBatch *batc
329329

330330
deletion.ctr.deleted_length += 1
331331

332-
if deletion.SegmentMap[string(segid[:])] == colexec.TxnWorkSpaceIdType {
333-
deletion.ctr.blockId_type[blkid] = RawBatchOffset
334-
} else if deletion.SegmentMap[string(segid[:])] == colexec.CnBlockIdType {
335-
deletion.ctr.blockId_type[blkid] = CNBlockOffset
332+
if colexec.IsDeletionOnTxnUnCommit(deletion.SegmentMap, &segid) {
333+
deletion.ctr.blockId_type[blkid] = DeletionOnTxnUnCommit
336334
} else {
337-
deletion.ctr.blockId_type[blkid] = RawRowIdBatch
335+
deletion.ctr.blockId_type[blkid] = DeletionOnCommitted
338336
}
339337

340338
if _, ok := deletion.ctr.partitionId_blockId_rowIdBatch[pIdx]; !ok {

pkg/sql/colexec/mergedelete/mergedelete_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/matrixorigin/matrixone/pkg/container/vector"
2828
"github.com/matrixorigin/matrixone/pkg/objectio"
2929
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
30-
"github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion"
3130
"github.com/matrixorigin/matrixone/pkg/testutil"
3231
"github.com/matrixorigin/matrixone/pkg/vm"
3332
"github.com/matrixorigin/matrixone/pkg/vm/engine"
@@ -81,7 +80,7 @@ func TestMergeDelete(t *testing.T) {
8180
Vecs: []*vector.Vector{
8281
testutil.MakeTextVector([]string{"mock_block_id0"}, nil),
8382
testutil.MakeTextVector([]string{string(bytes)}, nil),
84-
testutil.MakeInt8Vector([]int8{deletion.RawBatchOffset}, nil),
83+
testutil.MakeInt8Vector([]int8{0}, nil),
8584
testutil.MakeInt32Vector([]int32{0}, nil),
8685
vcu32,
8786
},
@@ -150,7 +149,7 @@ func TestMergeDelete(t *testing.T) {
150149
Vecs: []*vector.Vector{
151150
testutil.MakeTextVector([]string{"mock_block_id1", "mock_block_id2", "mock_block_id3"}, nil),
152151
testutil.MakeTextVector([]string{string(bytes1), string(bytes2), string(bytes3)}, nil),
153-
testutil.MakeInt8Vector([]int8{deletion.RawRowIdBatch, deletion.CNBlockOffset, deletion.FlushDeltaLoc}, nil),
152+
testutil.MakeInt8Vector([]int8{0, 1, 2}, nil),
154153
testutil.MakeInt32Vector([]int32{0, 0, 0}, nil),
155154
vcu32_2,
156155
},

pkg/sql/colexec/multi_update/s3writer_delegate.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type deleteBlockInfo struct {
7070
func newDeleteBlockData(inputBatch *batch.Batch, pkIdx int) *deleteBlockData {
7171
data := &deleteBlockData{
7272
bitmap: nulls.NewWithSize(int(options.DefaultBlockMaxRows)),
73-
typ: deletion.RawRowIdBatch,
73+
typ: deletion.DeletionOnCommitted,
7474
bat: newDeleteBatch(inputBatch, pkIdx),
7575
}
7676
return data
@@ -218,13 +218,11 @@ func (writer *s3WriterDelegate) prepareDeleteBatchs(
218218
if err != nil {
219219
return nil, err
220220
}
221-
strSegid := string(segid[:])
222-
if writer.segmentMap[strSegid] == colexec.TxnWorkSpaceIdType {
223-
blockMap[blkid].typ = deletion.RawBatchOffset
224-
} else if writer.segmentMap[strSegid] == colexec.CnBlockIdType {
225-
blockMap[blkid].typ = deletion.CNBlockOffset
221+
222+
if colexec.IsDeletionOnTxnUnCommit(writer.segmentMap, &segid) {
223+
blockMap[blkid].typ = deletion.DeletionOnTxnUnCommit
226224
} else {
227-
blockMap[blkid].typ = deletion.RawRowIdBatch
225+
blockMap[blkid].typ = deletion.DeletionOnCommitted
228226
}
229227
}
230228

@@ -247,7 +245,7 @@ func (writer *s3WriterDelegate) prepareDeleteBatchs(
247245
blkids := make([]types.Blockid, 0, len(blockMap))
248246
for blkid, data := range blockMap {
249247
//Don't flush rowids belong to uncommitted cn block and raw data batch in txn's workspace.
250-
if data.typ != deletion.RawRowIdBatch {
248+
if data.typ == deletion.DeletionOnTxnUnCommit {
251249
continue
252250
}
253251
blkids = append(blkids, blkid)

pkg/sql/colexec/server.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ import (
2727
// FIXME: shit design
2828
var srv atomic.Pointer[Server]
2929

30-
const (
31-
TxnWorkSpaceIdType = 1
32-
CnBlockIdType = 2
33-
)
34-
3530
func Get() *Server {
3631
return srv.Load()
3732
}

pkg/sql/colexec/types.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
"github.com/google/uuid"
2828
"github.com/matrixorigin/matrixone/pkg/logservice"
29-
"github.com/matrixorigin/matrixone/pkg/objectio"
3029
"github.com/matrixorigin/matrixone/pkg/vm/process"
3130
)
3231

@@ -107,15 +106,6 @@ type UuidProcMap struct {
107106
mp map[uuid.UUID]uuidProcMapItem
108107
}
109108

110-
type CnSegmentMap struct {
111-
sync.Mutex
112-
// tag whether a segment is generated by this txn
113-
// segmentName => uuid + file number
114-
// 1.mp[segmentName] = 1 => txnWorkSpace
115-
// 2.mp[segmentName] = 2 => Cn Blcok
116-
mp map[objectio.Segmentid]int32
117-
}
118-
119109
const (
120110
DefaultBatchSize = 8192
121111
)

0 commit comments

Comments
 (0)