Skip to content

Commit 4128279

Browse files
author
Harshil Goel
authored
fix(core): Fix race condition in mutation map (#9473) (#9480)
1 parent 2ad525d commit 4128279

File tree

5 files changed

+146
-9
lines changed

5 files changed

+146
-9
lines changed

posting/index_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ func TestTokensTable(t *testing.T) {
184184
attr := x.GalaxyAttr("name")
185185
key := x.DataKey(attr, 1)
186186
l, err := getNew(key, ps, math.MaxUint64)
187+
l.mutationMap.readTs = 1
187188
require.NoError(t, err)
188189

189190
edge := &pb.DirectedEdge{

posting/list.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (mm *MutableLayer) setCurrentEntries(ts uint64, pl *pb.PostingList) {
154154
return
155155
}
156156
if mm.readTs != 0 {
157-
x.AssertTrue(mm.readTs == ts)
157+
x.AssertTruef(mm.readTs == ts, "List object reused for a different transaction %d %d", mm.readTs, ts)
158158
}
159159

160160
mm.readTs = ts
@@ -347,7 +347,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
347347
// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
348348
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
349349
if mm.readTs != 0 {
350-
x.AssertTrue(mpost.StartTs == mm.readTs)
350+
x.AssertTruef(mpost.StartTs == mm.readTs, "Diffrenent read ts and start ts found %d %d", mpost.StartTs, mm.readTs)
351351
}
352352

353353
mm.readTs = mpost.StartTs
@@ -404,7 +404,7 @@ func (mm *MutableLayer) print() string {
404404
mm.deleteAllMarker)
405405
}
406406

407-
func (l *List) print() string {
407+
func (l *List) Print() string {
408408
return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print())
409409
}
410410

@@ -693,6 +693,7 @@ func (it *pIterator) posting() *pb.Posting {
693693
it.pidx++
694694
}
695695
it.uidPosting.Uid = uid
696+
it.uidPosting.ValType = pb.Posting_UID
696697
return it.uidPosting
697698
}
698699

@@ -993,6 +994,14 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
993994
}
994995
l.mutationMap.committedUidsTime = x.Max(l.mutationMap.committedUidsTime, commitTs)
995996

997+
if refresh {
998+
newMap := make(map[uint64]*pb.Posting, len(l.mutationMap.committedUids))
999+
for uid, post := range l.mutationMap.committedUids {
1000+
newMap[uid] = post
1001+
}
1002+
l.mutationMap.committedUids = newMap
1003+
}
1004+
9961005
for _, mpost := range pl.Postings {
9971006
if hasDeleteAll(mpost) {
9981007
l.mutationMap.deleteAllMarker = commitTs
@@ -1129,7 +1138,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
11291138
// pitr iterates through immutable postings
11301139
err = pitr.seek(l, afterUid, deleteBelowTs)
11311140
if err != nil {
1132-
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.print())
1141+
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate %v", l.Print())
11331142
}
11341143

11351144
loop:
@@ -1919,7 +1928,10 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
19191928

19201929
// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
19211930
// means we need to return l.plist
1922-
return l.plist
1931+
if l.plist != nil && len(l.plist.Postings) > 0 {
1932+
return l.plist
1933+
}
1934+
return nil
19231935
}
19241936

19251937
// Value returns the default value from the posting list. The default value is

posting/list_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func TestGetSinglePosting(t *testing.T) {
135135

136136
res, err := l.StaticValue(1)
137137
require.NoError(t, err)
138+
fmt.Println(res, res == nil)
138139
require.Equal(t, res == nil, true)
139140

140141
l.plist = create_pl(1, 1)
@@ -265,7 +266,7 @@ func TestAddMutation_jchiu1(t *testing.T) {
265266
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 12)
266267
ol, err := GetNoStore(key, math.MaxUint64)
267268
require.NoError(t, err)
268-
269+
ol.mutationMap.setTs(1)
269270
// Set value to cars and merge to BadgerDB.
270271
edge := &pb.DirectedEdge{
271272
Value: []byte("cars"),
@@ -308,6 +309,7 @@ func TestAddMutation_DelSet(t *testing.T) {
308309
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1534)
309310
ol, err := GetNoStore(key, math.MaxUint64)
310311
require.NoError(t, err)
312+
ol.mutationMap.setTs(1)
311313

312314
// DO sp*, don't commit
313315
// Del a value cars and but don't merge.
@@ -323,6 +325,7 @@ func TestAddMutation_DelSet(t *testing.T) {
323325
Value: []byte("newcars"),
324326
}
325327
ol1, err := GetNoStore(key, math.MaxUint64)
328+
ol1.mutationMap.setTs(2)
326329
require.NoError(t, err)
327330
txn = &Txn{StartTs: 2}
328331
addMutationHelper(t, ol1, edge, Set, txn)
@@ -334,6 +337,7 @@ func TestAddMutation_DelSet(t *testing.T) {
334337
func TestAddMutation_DelRead(t *testing.T) {
335338
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1543)
336339
ol, err := GetNoStore(key, math.MaxUint64)
340+
ol.mutationMap.setTs(1)
337341
require.NoError(t, err)
338342

339343
// Set value to newcars, and commit it
@@ -372,6 +376,7 @@ func TestAddMutation_DelRead(t *testing.T) {
372376
func TestAddMutation_jchiu2(t *testing.T) {
373377
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 15)
374378
ol, err := GetNoStore(key, math.MaxUint64)
379+
ol.mutationMap.setTs(1)
375380
require.NoError(t, err)
376381

377382
// Del a value cars and but don't merge.
@@ -394,6 +399,7 @@ func TestAddMutation_jchiu2(t *testing.T) {
394399
func TestAddMutation_jchiu2_Commit(t *testing.T) {
395400
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 16)
396401
ol, err := GetNoStore(key, math.MaxUint64)
402+
ol.mutationMap.setTs(1)
397403
require.NoError(t, err)
398404

399405
// Del a value cars and but don't merge.
@@ -419,6 +425,7 @@ func TestAddMutation_jchiu2_Commit(t *testing.T) {
419425
func TestAddMutation_jchiu3(t *testing.T) {
420426
key := x.DataKey(x.GalaxyAttr("value"), 29)
421427
ol, err := GetNoStore(key, math.MaxUint64)
428+
ol.mutationMap.setTs(1)
422429
require.NoError(t, err)
423430

424431
// Set value to cars and merge to BadgerDB.
@@ -459,6 +466,7 @@ func TestAddMutation_jchiu3(t *testing.T) {
459466
func TestAddMutation_mrjn1(t *testing.T) {
460467
key := x.DataKey(x.GalaxyAttr("value"), 21)
461468
ol, err := GetNoStore(key, math.MaxUint64)
469+
ol.mutationMap.setTs(1)
462470
require.NoError(t, err)
463471

464472
// Set a value cars and merge.

posting/mvcc.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
580580
l := new(List)
581581
l.key = key
582582
l.plist = new(pb.PostingList)
583+
l.mutationMap = newMutableLayer()
583584
l.minTs = 0
584585

585586
// We use the following block of code to trigger incremental rollup on this key.
@@ -626,9 +627,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
626627
return err
627628
}
628629
pl.CommitTs = item.Version()
629-
if l.mutationMap == nil {
630-
l.mutationMap = newMutableLayer()
631-
}
632630
l.mutationMap.insertCommittedPostings(pl)
633631
return nil
634632
})

worker/sort_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,124 @@ func TestEmptyTypeSchema(t *testing.T) {
101101
x.ParseNamespaceAttr(types[0].TypeName)
102102
}
103103

104+
func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) {
105+
// Setup temporary directory for Badger DB
106+
dir, err := os.MkdirTemp("", "storetest_")
107+
require.NoError(t, err)
108+
defer os.RemoveAll(dir)
109+
110+
opt := badger.DefaultOptions(dir)
111+
ps, err := badger.OpenManaged(opt)
112+
require.NoError(t, err)
113+
posting.Init(ps, 0, false)
114+
Init(ps)
115+
116+
// Set schema
117+
schemaTxt := `
118+
room: string @index(hash) @upsert .
119+
person: string @index(hash) @upsert .
120+
office: uid @reverse @count .
121+
`
122+
err = schema.ParseBytes([]byte(schemaTxt), 1)
123+
require.NoError(t, err)
124+
125+
ctx := context.Background()
126+
attrRoom := x.GalaxyAttr("room")
127+
attrPerson := x.GalaxyAttr("person")
128+
attrOffice := x.GalaxyAttr("office")
129+
130+
uidRoom := uint64(1)
131+
uidJohn := uint64(2)
132+
133+
runMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
134+
txn := posting.Oracle().RegisterStartTs(startTs)
135+
for _, edge := range edges {
136+
require.NoError(t, runMutation(ctx, edge, txn))
137+
}
138+
txn.Update()
139+
writer := posting.NewTxnWriter(ps)
140+
require.NoError(t, txn.CommitToDisk(writer, commitTs))
141+
require.NoError(t, writer.Flush())
142+
txn.UpdateCachedKeys(commitTs)
143+
}
144+
145+
// Initial mutation: Set John → Leopard
146+
runMutation(1, 3, []*pb.DirectedEdge{
147+
{
148+
Entity: uidJohn,
149+
Attr: attrPerson,
150+
Value: []byte("John Smith"),
151+
ValueType: pb.Posting_STRING,
152+
Op: pb.DirectedEdge_SET,
153+
},
154+
{
155+
Entity: uidRoom,
156+
Attr: attrRoom,
157+
Value: []byte("Leopard"),
158+
ValueType: pb.Posting_STRING,
159+
Op: pb.DirectedEdge_SET,
160+
},
161+
{
162+
Entity: uidJohn,
163+
Attr: attrOffice,
164+
ValueId: uidRoom,
165+
ValueType: pb.Posting_UID,
166+
Op: pb.DirectedEdge_SET,
167+
},
168+
})
169+
170+
key := x.DataKey(attrOffice, uidJohn)
171+
rollup(t, key, ps, 4)
172+
173+
// Second mutation: Remove John from Leopard, assign Amanda
174+
uidAmanda := uint64(3)
175+
176+
runMutation(6, 8, []*pb.DirectedEdge{
177+
{
178+
Entity: uidJohn,
179+
Attr: attrOffice,
180+
ValueId: uidRoom,
181+
ValueType: pb.Posting_UID,
182+
Op: pb.DirectedEdge_DEL,
183+
},
184+
{
185+
Entity: uidAmanda,
186+
Attr: attrPerson,
187+
Value: []byte("Amanda Anderson"),
188+
ValueType: pb.Posting_STRING,
189+
Op: pb.DirectedEdge_SET,
190+
},
191+
{
192+
Entity: uidAmanda,
193+
Attr: attrOffice,
194+
ValueId: uidRoom,
195+
ValueType: pb.Posting_UID,
196+
Op: pb.DirectedEdge_SET,
197+
},
198+
})
199+
200+
// Read and validate: Amanda assigned, John unassigned
201+
txnRead := posting.Oracle().RegisterStartTs(10)
202+
203+
list, err := txnRead.Get(key)
204+
require.NoError(t, err)
205+
206+
uids, err := list.Uids(posting.ListOptions{ReadTs: 10})
207+
require.NoError(t, err)
208+
209+
// This assertion FAILS in the broken case where both Amanda and John are assigned
210+
require.Equal(t, 0, len(uids.Uids), "John should no longer have an office assigned")
211+
212+
keyRev := x.ReverseKey(attrOffice, uidRoom)
213+
listRev, err := txnRead.Get(keyRev)
214+
require.NoError(t, err)
215+
216+
reverseUids, err := listRev.Uids(posting.ListOptions{ReadTs: 10})
217+
require.NoError(t, err)
218+
219+
require.Equal(t, []uint64{uidAmanda}, reverseUids.Uids, "Only Amanda should be assigned on reverse edge")
220+
}
221+
104222
func TestGetScalarList(t *testing.T) {
105223
dir, err := os.MkdirTemp("", "storetest_")
106224
x.Check(err)

0 commit comments

Comments
 (0)