Skip to content

Commit 978a0d4

Browse files
Add mutation pipeline
1 parent 9d4cb77 commit 978a0d4

File tree

15 files changed

+1835
-128
lines changed

15 files changed

+1835
-128
lines changed

posting/index.go

Lines changed: 835 additions & 9 deletions
Large diffs are not rendered by default.

posting/index_test.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,27 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
139139
}
140140
txn := Oracle().RegisterStartTs(startTs)
141141
txn.cache.SetIfAbsent(string(l.key), l)
142-
if index {
143-
require.NoError(t, l.AddMutationWithIndex(context.Background(), edge, txn))
144-
} else {
145-
require.NoError(t, l.addMutation(context.Background(), txn, edge))
146-
}
147142

143+
mp := NewMutationPipeline(txn)
144+
err := mp.Process(context.Background(), []*pb.DirectedEdge{edge})
145+
require.NoError(t, err)
148146
txn.Update()
149147
txn.UpdateCachedKeys(commitTs)
150148
writer := NewTxnWriter(pstore)
151149
require.NoError(t, txn.CommitToDisk(writer, commitTs))
152150
require.NoError(t, writer.Flush())
151+
newTxn := Oracle().RegisterStartTs(commitTs + 1)
152+
l, err = newTxn.Get(l.key)
153+
require.NoError(t, err)
153154
}
154155

156+
const schemaPreVal = `
157+
name: string .
158+
name2: string .
159+
dob: dateTime .
160+
friend: [uid] .
161+
`
162+
155163
const schemaVal = `
156164
name: string @index(term) .
157165
name2: string @index(term) .
@@ -263,6 +271,9 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,
263271
func TestCountReverseIndexWithData(t *testing.T) {
264272
require.NoError(t, pstore.DropAll())
265273
MemLayerInstance.clear()
274+
preIndex := "testcount: [uid] ."
275+
require.NoError(t, schema.ParseBytes([]byte(preIndex), 1))
276+
266277
indexNameCountVal := "testcount: [uid] @count @reverse ."
267278

268279
attr := x.AttrInRootNamespace("testcount")
@@ -298,6 +309,9 @@ func TestCountReverseIndexWithData(t *testing.T) {
298309
func TestCountReverseIndexEmptyPosting(t *testing.T) {
299310
require.NoError(t, pstore.DropAll())
300311
MemLayerInstance.clear()
312+
preIndex := "testcount: [uid] ."
313+
require.NoError(t, schema.ParseBytes([]byte(preIndex), 1))
314+
301315
indexNameCountVal := "testcount: [uid] @count @reverse ."
302316

303317
attr := x.AttrInRootNamespace("testcount")
@@ -330,6 +344,8 @@ func TestCountReverseIndexEmptyPosting(t *testing.T) {
330344
}
331345

332346
func TestRebuildTokIndex(t *testing.T) {
347+
require.NoError(t, schema.ParseBytes([]byte(schemaPreVal), 1))
348+
333349
addEdgeToValue(t, x.AttrInRootNamespace("name2"), 91, "Michonne", uint64(1), uint64(2))
334350
addEdgeToValue(t, x.AttrInRootNamespace("name2"), 92, "David", uint64(3), uint64(4))
335351

posting/list.go

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,13 +404,18 @@ func (mm *MutableLayer) print() string {
404404
if mm == nil {
405405
return ""
406406
}
407-
return fmt.Sprintf("Committed List: %+v Proposed list: %+v Delete all marker: %d \n",
407+
return fmt.Sprintf("Committed List: %+v Proposed list: %+v Delete all marker: %d. Count: %d \n",
408408
mm.committedEntries,
409409
mm.currentEntries,
410-
mm.deleteAllMarker)
410+
mm.deleteAllMarker,
411+
mm.length)
411412
}
412413

413414
func (l *List) Print() string {
415+
if l.plist.Pack != nil {
416+
uids := codec.Decode(l.plist.Pack, 0)
417+
return fmt.Sprintf("minTs: %d, committed uids: %+v, mutationMap: %s", l.minTs, uids, l.mutationMap.print())
418+
}
414419
return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print())
415420
}
416421

@@ -712,6 +717,53 @@ type ListOptions struct {
712717
First int
713718
}
714719

720+
func NewPostingExisting(p *pb.Posting, t *pb.DirectedEdge) {
721+
var op uint32
722+
switch t.Op {
723+
case pb.DirectedEdge_SET:
724+
op = Set
725+
case pb.DirectedEdge_OVR:
726+
op = Ovr
727+
case pb.DirectedEdge_DEL:
728+
op = Del
729+
default:
730+
x.Fatalf("Unhandled operation: %+v", t)
731+
}
732+
733+
var postingType pb.Posting_PostingType
734+
switch {
735+
case len(t.Lang) > 0:
736+
postingType = pb.Posting_VALUE_LANG
737+
case t.ValueId == 0:
738+
postingType = pb.Posting_VALUE
739+
default:
740+
postingType = pb.Posting_REF
741+
}
742+
743+
p.Uid = t.ValueId
744+
p.Value = t.Value
745+
p.ValType = t.ValueType
746+
p.PostingType = postingType
747+
p.LangTag = []byte(t.Lang)
748+
p.Op = op
749+
p.Facets = t.Facets
750+
}
751+
752+
func GetPostingOp(top uint32) pb.DirectedEdge_Op {
753+
var op pb.DirectedEdge_Op
754+
switch top {
755+
case Set:
756+
op = pb.DirectedEdge_SET
757+
case Del:
758+
op = pb.DirectedEdge_DEL
759+
case Ovr:
760+
op = pb.DirectedEdge_OVR
761+
default:
762+
x.Fatalf("Unhandled operation: %+v", top)
763+
}
764+
return op
765+
}
766+
715767
// NewPosting takes the given edge and returns its equivalent representation as a posting.
716768
func NewPosting(t *pb.DirectedEdge) *pb.Posting {
717769
var op uint32
@@ -789,12 +841,12 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI
789841
// The current value should be deleted in favor of this value. This needs to
790842
// be done because the fingerprint for the value is not math.MaxUint64 as is
791843
// the case with the rest of the scalar predicates.
792-
newPlist := &pb.PostingList{}
844+
newPlist := &pb.PostingList{
845+
Postings: []*pb.Posting{createDeleteAllPosting()},
846+
}
793847
if mpost.Op != Del {
794-
// If we are setting a new value then we can just delete all the older values.
795-
newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting())
848+
newPlist.Postings = append(newPlist.Postings, mpost)
796849
}
797-
newPlist.Postings = append(newPlist.Postings, mpost)
798850
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
799851
return nil
800852
}
@@ -833,6 +885,10 @@ func fingerprintEdge(t *pb.DirectedEdge) uint64 {
833885
return id
834886
}
835887

888+
func FingerprintEdge(t *pb.DirectedEdge) uint64 {
889+
return fingerprintEdge(t)
890+
}
891+
836892
func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error {
837893
l.Lock()
838894
defer l.Unlock()
@@ -1110,6 +1166,13 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) {
11101166
}
11111167
return pi.Uid < pj.Uid
11121168
})
1169+
1170+
if len(posts) > 0 {
1171+
if hasDeleteAll(posts[0]) {
1172+
posts = posts[1:]
1173+
}
1174+
}
1175+
11131176
return deleteAllMarker, posts
11141177
}
11151178

@@ -1258,6 +1321,11 @@ func (l *List) GetLength(readTs uint64) int {
12581321
length += immutLen
12591322
}
12601323

1324+
pureLength := l.length(readTs, 0)
1325+
if pureLength != length {
1326+
panic(fmt.Sprintf("pure length != length %d %d %s", pureLength, length, l.Print()))
1327+
}
1328+
12611329
return length
12621330
}
12631331

@@ -2007,6 +2075,18 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
20072075
if l.plist != nil && len(l.plist.Postings) > 0 {
20082076
return l.plist
20092077
}
2078+
if l.plist != nil && l.plist.Pack != nil {
2079+
uids := codec.Decode(l.plist.Pack, 0)
2080+
return &pb.PostingList{
2081+
Postings: []*pb.Posting{
2082+
{
2083+
Uid: uids[0],
2084+
ValType: pb.Posting_UID,
2085+
Op: Set,
2086+
},
2087+
},
2088+
}
2089+
}
20102090
return nil
20112091
}
20122092

posting/list_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func TestGetSinglePosting(t *testing.T) {
124124

125125
res, err := l.StaticValue(1)
126126
require.NoError(t, err)
127-
fmt.Println(res, res == nil)
128127
require.Equal(t, res == nil, true)
129128

130129
l.plist = create_pl(1, 1)

0 commit comments

Comments
 (0)