diff --git a/posting/index.go b/posting/index.go index 87b1bcfd015..ba975069b6a 100644 --- a/posting/index.go +++ b/posting/index.go @@ -33,6 +33,8 @@ import ( "github.com/hypermodeinc/dgraph/v25/schema" "github.com/hypermodeinc/dgraph/v25/tok" "github.com/hypermodeinc/dgraph/v25/tok/hnsw" + tokIndex "github.com/hypermodeinc/dgraph/v25/tok/index" + "github.com/hypermodeinc/dgraph/v25/types" "github.com/hypermodeinc/dgraph/v25/x" ) @@ -161,21 +163,19 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) len(info.factorySpecs) > 0 { // retrieve vector from inUuid save as inVec inVec := types.BytesAsFloatArray(data[0].Value.([]byte)) - tc := hnsw.NewTxnCache(NewViTxn(txn), txn.StartTs) indexer, err := info.factorySpecs[0].CreateIndex(attr) if err != nil { return []*pb.DirectedEdge{}, err } - edges, err := indexer.Insert(ctx, tc, uid, inVec) - if err != nil { - return []*pb.DirectedEdge{}, err + caches := GetVectorTransactions(indexer.NumThreads(), txn.StartTs, attr) + for i := range caches { + caches[i].(*VectorTransaction).txn = txn } - pbEdges := []*pb.DirectedEdge{} - for _, e := range edges { - pbe := indexEdgeToPbEdge(e) - pbEdges = append(pbEdges, pbe) + indexer.SetCaches(caches) + if err := indexer.Insert(ctx, uid, inVec); err != nil { + return []*pb.DirectedEdge{}, err } - return pbEdges, nil + return []*pb.DirectedEdge{}, nil } } @@ -794,6 +794,54 @@ func (r *rebuilder) RunWithoutTemp(ctx context.Context) error { }) } +func printTreeStatsDeltas(txn *Txn) { + txn.cache.Lock() + + numLevels := 20 + numNodes := make([]int, numLevels) + numConnections := make([]int, numLevels) + + var temp [][]uint64 + for key, plMarshalled := range txn.cache.deltas { + pk, _ := x.Parse([]byte(key)) + var data pb.PostingList + proto.Unmarshal(plMarshalled, &data) + if strings.Contains(pk.Attr, "__vector_") && !strings.Contains(pk.Attr, hnsw.VecEntry) { + err := decodeUint64MatrixUnsafe(data.Postings[0].Value, &temp) + if err != nil { + fmt.Println("Error while decoding", err) + } + + for i := range temp { + if len(temp[i]) > 0 { + numNodes[i] += 1 + } + numConnections[i] += len(temp[i]) + } + + } + } + + for i := range numLevels { + fmt.Printf("%d, ", numNodes[i]) + } + fmt.Println("") + for i := range numLevels { + fmt.Printf("%d, ", numConnections[i]) + } + fmt.Println("") + for i := range numLevels { + if numNodes[i] == 0 { + fmt.Printf("0, ") + continue + } + fmt.Printf("%d, ", numConnections[i]/numNodes[i]) + } + fmt.Println("") + + txn.cache.Unlock() +} + func printTreeStats(txn *Txn) { txn.cache.Lock() @@ -1361,6 +1409,438 @@ func (rb *indexRebuildInfo) prefixesForTokIndexes() ([][]byte, error) { return prefixes, nil } +const numCentroids = 1000 + +func GetVectorTransactions(n int, startTs uint64, pred string) []tokIndex.CacheType { + retVal := make([]tokIndex.CacheType, n) + for i := 0; i < n; i++ { + retVal[i] = &VectorTransaction{} + retVal[i].(*VectorTransaction).NewVT(startTs) + if n > 1 { + retVal[i].(*VectorTransaction).UpdateSplit(i, pred) + } + } + return retVal +} + +type VectorTransaction struct { + txn *Txn + startTs uint64 + vecPred string + edgePred string + vector map[uint64]*[]byte + edges map[uint64]*[]byte + others map[string]*[]byte + + readDisk bool +} + +func (vt *VectorTransaction) SetCache(cache *LocalCache) { + vt.txn = NewTxn(vt.startTs) + vt.txn.cache = cache +} + +func (vt *VectorTransaction) UpdateSplit(i int, pred string) { + vt.vecPred = pred + vt.edgePred = fmt.Sprintf("%s%s_%d", pred, hnsw.VecKeyword, i) +} + +func (vt *VectorTransaction) NewVT(startTs uint64) { + vt.txn = &Txn{ + StartTs: startTs, + cache: NewLocalCache(startTs), + } + vt.startTs = startTs + vt.vector = make(map[uint64]*[]byte) + vt.edges = make(map[uint64]*[]byte) + vt.others = make(map[string]*[]byte) + vt.readDisk = true +} + +func (vt *VectorTransaction) Find(prefix []byte, filter func(val []byte) bool) (uint64, error) { + return vt.txn.cache.Find(prefix, filter) +} + +func (vt *VectorTransaction) SetVector(uid uint64, vec *[]byte) { + vt.vector[uid] = vec +} + +func (vt *VectorTransaction) SetEdge(uid uint64, edge *[]byte) { + //fmt.Println("SETTING EDGE", uid, edge) + vt.edges[uid] = edge +} + +func (vt *VectorTransaction) SetOther(key string, val *[]byte) { + vt.others[key] = val +} + +func (vt *VectorTransaction) GetVector(uid uint64) *[]byte { + val, ok := vt.vector[uid] + if ok { + return val + } + if !vt.readDisk { + return nil + } + pl, err := vt.txn.Get(x.DataKey(vt.vecPred, uid)) + if err != nil { + return nil + } + rval, err := pl.Value(vt.startTs) + if err != nil { + return nil + } + value := rval.Value.([]byte) + //fmt.Println("GET VECTOR: ", uid, value, vt.vecPred) + vt.vector[uid] = &value + return &value +} + +func (vt *VectorTransaction) GetEdge(uid uint64) *[]byte { + val, ok := vt.edges[uid] + if ok { + return val + } + if !vt.readDisk { + return nil + } + pl, err := vt.txn.Get(x.DataKey(vt.edgePred, uid)) + if err != nil { + return nil + } + rval, err := pl.Value(vt.startTs) + //fmt.Println("GET EDGE: ", uid, pl, err, vt.edgePred, rval) + if err != nil { + return nil + } + value := rval.Value.([]byte) + if len(value) == 0 { + return nil + } + vt.edges[uid] = &value + return &value +} + +func (vt *VectorTransaction) GetOther(key string) *[]byte { + val, ok := vt.others[key] + if ok { + return val + } + if !vt.readDisk { + return nil + } + pl, err := vt.txn.Get(x.DataKey(key, 1)) + if err != nil { + return nil + } + rval, err := pl.Value(vt.startTs) + //fmt.Println("GET OTHER: ", key, pl, rval, err) + if err != nil { + return nil + } + value := rval.Value.([]byte) + //fmt.Println("GET OTHER: ", key, value) + if len(value) == 0 { + return nil + } + vt.others[key] = &value + return &value +} + +func (vt *VectorTransaction) Update() { + for uid, edges := range vt.edges { + posting := &pb.Posting{ + Op: Set, + Value: *edges, + ValType: pb.Posting_BINARY, + Uid: math.MaxUint64, + } + pl := &pb.PostingList{ + Postings: []*pb.Posting{posting}, + } + data, err := proto.Marshal(pl) + if err != nil { + return + } + vt.txn.cache.deltas[string(x.DataKey(vt.edgePred, uid))] = data + } + + for str, edges := range vt.others { + posting := &pb.Posting{ + Op: Set, + Value: *edges, + ValType: pb.Posting_BINARY, + Uid: math.MaxUint64, + } + pl := &pb.PostingList{ + Postings: []*pb.Posting{posting}, + } + data, err := proto.Marshal(pl) + if err != nil { + return + } + vt.txn.cache.deltas[string(x.DataKey(str, 1))] = data + } + + fmt.Println("AFTER UPDATE", len(vt.txn.cache.deltas)) + + vt.vector = nil + vt.edges = nil + vt.others = nil +} + +func (vt *VectorTransaction) LockKey(key []byte) { +} + +func (vt *VectorTransaction) UnlockKey(key []byte) { +} + +func (vt *VectorTransaction) Ts() uint64 { + return vt.startTs +} + +func rebuildVectorIndex(ctx context.Context, factorySpecs []*tok.FactoryCreateSpec, rb *IndexRebuild) error { + pk := x.ParsedKey{Attr: rb.Attr} + + indexer, err := factorySpecs[0].CreateIndex(pk.Attr) + if err != nil { + return err + } + + if indexer.NumSeedVectors() > 0 { + count := 0 + MemLayerInstance.IterateDisk(ctx, IterateDiskArgs{ + Prefix: pk.DataPrefix(), + ReadTs: rb.StartTs, + AllVersions: false, + Reverse: false, + CheckInclusion: func(uid uint64) error { + return nil + }, + Function: func(l *List, pk x.ParsedKey) error { + val, err := l.Value(rb.StartTs) + if err != nil { + return err + } + inVec := types.BytesAsFloatArray(val.Value.([]byte)) + count += 1 + indexer.AddSeedVector(inVec) + if count == indexer.NumSeedVectors() { + return ErrStopIteration + } + return nil + }, + StartKey: x.DataKey(rb.Attr, 0), + }) + } + + txns := make([]*Txn, indexer.NumThreads()) + for i := range txns { + txns[i] = NewTxn(rb.StartTs) + } + caches := GetVectorTransactions(indexer.NumThreads(), rb.StartTs, rb.Attr) + for i := range len(caches) { + caches[i].(*VectorTransaction).txn = txns[i] + caches[i].(*VectorTransaction).readDisk = false + } + indexer.SetCaches(caches) + + for pass_idx := range indexer.NumBuildPasses() { + fmt.Println("Building pass", pass_idx) + + indexer.StartBuild() + + builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} + builder.fn = func(uid uint64, pl *List, txn *Txn) ([]*pb.DirectedEdge, error) { + edges := []*pb.DirectedEdge{} + val, err := pl.Value(rb.StartTs) + if err != nil { + return []*pb.DirectedEdge{}, err + } + + inVec := types.BytesAsFloatArray(val.Value.([]byte)) + indexer.BuildInsert(ctx, uid, inVec) + return edges, nil + } + + err := builder.RunWithoutTemp(ctx) + if err != nil { + return err + } + + indexer.EndBuild() + } + + for pass_idx := range indexer.NumIndexPasses() { + fmt.Println("Indexing pass", pass_idx) + + indexer.StartBuild() + + builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} + builder.fn = func(uid uint64, pl *List, txn *Txn) ([]*pb.DirectedEdge, error) { + edges := []*pb.DirectedEdge{} + val, err := pl.Value(rb.StartTs) + if err != nil { + return []*pb.DirectedEdge{}, err + } + + inVec := types.BytesAsFloatArray(val.Value.([]byte)) + indexer.BuildInsert(ctx, uid, inVec) + return edges, nil + } + + err := builder.RunWithoutTemp(ctx) + if err != nil { + return err + } + + for _, idx := range indexer.EndBuild() { + txns[idx].Update() + caches[idx].(*VectorTransaction).Update() + writer := NewTxnWriter(pstore) + + x.ExponentialRetry(int(x.Config.MaxRetries), + 20*time.Millisecond, func() error { + err := txns[idx].CommitToDisk(writer, rb.StartTs) + if err == badger.ErrBannedKey { + glog.Errorf("Error while writing to banned namespace.") + return nil + } + return err + }) + + printTreeStatsDeltas(txns[idx]) + txns[idx].cache.deltas = nil + txns[idx].cache.plists = nil + txns[idx] = nil + } + } + + return nil + + // MemLayerInstance.IterateDisk(ctx, IterateDiskArgs{ + // Prefix: pk.DataPrefix(), + // ReadTs: rb.StartTs, + // AllVersions: false, + // Reverse: false, + // CheckInclusion: func(uid uint64) error { + // return nil + // }, + // Function: func(l *List, pk x.ParsedKey) error { + // val, err := l.Value(rb.StartTs) + // if err != nil { + // return err + // } + // inVec := types.BytesAsFloatArray(val.Value.([]byte)) + // vc.addSeedCentroid(inVec) + // if len(vc.centroids) == numCentroids { + // return ErrStopIteration + // } + // return nil + // }, + // StartKey: x.DataKey(rb.Attr, 0), + // }) + + // vc.randomInit() + + // fmt.Println("Clustering Vectors") + // for range 5 { + // builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} + // builder.fn = func(uid uint64, pl *List, txn *Txn) ([]*pb.DirectedEdge, error) { + // edges := []*pb.DirectedEdge{} + // val, err := pl.Value(txn.StartTs) + // if err != nil { + // return []*pb.DirectedEdge{}, err + // } + + // inVec := types.BytesAsFloatArray(val.Value.([]byte)) + // vc.addVector(inVec) + // return edges, nil + // } + + // err := builder.RunWithoutTemp(ctx) + // if err != nil { + // return err + // } + + // vc.updateCentroids() + // } + + // tcs := make([]*hnsw.TxnCache, vc.numCenters) + // txns := make([]*Txn, vc.numCenters) + // indexers := make([]index.VectorIndex[float32], vc.numCenters) + // for i := 0; i < vc.numCenters; i++ { + // txns[i] = NewTxn(rb.StartTs) + // tcs[i] = hnsw.NewTxnCache(NewViTxn(txns[i]), rb.StartTs) + // indexers_i, err := factorySpecs[0].CreateIndex(pk.Attr, i) + // if err != nil { + // return err + // } + // vc.mutexs[i] = &sync.Mutex{} + // indexers[i] = indexers_i + // } + + // var edgesCreated atomic.Int64 + + // numPasses := vc.numCenters / 100 + // for pass_idx := range numPasses { + // builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} + // builder.fn = func(uid uint64, pl *List, txn *Txn) ([]*pb.DirectedEdge, error) { + // val, err := pl.Value(txn.StartTs) + // if err != nil { + // return []*pb.DirectedEdge{}, err + // } + + // inVec := types.BytesAsFloatArray(val.Value.([]byte)) + // idx := vc.findCentroid(inVec) + // if idx%numPasses != pass_idx { + // return []*pb.DirectedEdge{}, nil + // } + // vc.mutexs[idx].Lock() + // defer vc.mutexs[idx].Unlock() + // _, err = indexers[idx].Insert(ctx, tcs[idx], uid, inVec) + // if err != nil { + // return []*pb.DirectedEdge{}, err + // } + + // edgesCreated.Add(int64(1)) + // return nil, nil + // } + + // err := builder.RunWithoutTemp(ctx) + // if err != nil { + // return err + // } + + // for idx := range vc.counts { + // if idx%numPasses != pass_idx { + // continue + // } + // txns[idx].Update() + // writer := NewTxnWriter(pstore) + + // x.ExponentialRetry(int(x.Config.MaxRetries), + // 20*time.Millisecond, func() error { + // err := txns[idx].CommitToDisk(writer, rb.StartTs) + // if err == badger.ErrBannedKey { + // glog.Errorf("Error while writing to banned namespace.") + // return nil + // } + // return err + // }) + + // txns[idx].cache.plists = nil + // txns[idx] = nil + // tcs[idx] = nil + // indexers[idx] = nil + // } + + // fmt.Printf("Created %d edges in pass %d out of %d\n", edgesCreated.Load(), pass_idx, numPasses) + // } + + // return nil +} + // rebuildTokIndex rebuilds index for a given attribute. // We commit mutations with startTs and ignore the errors. func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error { @@ -1392,6 +1872,9 @@ func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error { } runForVectors := (len(factorySpecs) != 0) + if runForVectors { + return rebuildVectorIndex(ctx, factorySpecs, rb) + } pk := x.ParsedKey{Attr: rb.Attr} builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} @@ -1652,10 +2135,11 @@ func prefixesToDropVectorIndexEdges(ctx context.Context, rb *IndexRebuild) [][]b prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecDead))) prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecKeyword))) - for i := range hnsw.VectorIndexMaxLevels { - prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecKeyword, fmt.Sprint(i)))) + for i := range 1000 { + prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecKeyword, fmt.Sprintf("_%d", i)))) + prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecDead, fmt.Sprintf("_%d", i)))) + prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecEntry, fmt.Sprintf("_%d", i)))) } - return prefixes } diff --git a/posting/list.go b/posting/list.go index 754a093a141..1471bf03af9 100644 --- a/posting/list.go +++ b/posting/list.go @@ -711,6 +711,14 @@ type ListOptions struct { First int } +func NewVectorPosting(uid uint64, vec *[]byte) *pb.Posting { + return &pb.Posting{ + Value: *vec, + ValType: pb.Posting_BINARY, + Op: Set, + } +} + // NewPosting takes the given edge and returns its equivalent representation as a posting. func NewPosting(t *pb.DirectedEdge) *pb.Posting { var op uint32 diff --git a/posting/mvcc.go b/posting/mvcc.go index 85021c67854..4db4afc36a3 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -297,6 +297,8 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { for ; idx < len(keys); idx++ { key := keys[idx] data := cache.deltas[key] + //pk, _ := x.Parse([]byte(key)) + //fmt.Println(pk, data) if len(data) == 0 { continue } @@ -634,6 +636,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { // lists ended up being rolled-up multiple times. This issue was caught by the // uid-set Jepsen test. pk, err := x.Parse(key) + //fmt.Println("READING ", pk) if err != nil { return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key) } @@ -712,6 +715,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { l.mutationMap = newMutableLayer() } l.mutationMap.insertCommittedPostings(pl) + //fmt.Println("HERE", pk, pl, l.mutationMap) return nil }) if err != nil { diff --git a/schema/parse.go b/schema/parse.go index 776596d292b..1fd4d64128d 100644 --- a/schema/parse.go +++ b/schema/parse.go @@ -306,7 +306,7 @@ func parseTokenOrVectorIndexSpec( tokenizer, has := tok.GetTokenizer(tokenOrFactoryName) if !has { return tokenOrFactoryName, nil, false, - next.Errorf("Invalid tokenizer %s", next.Val) + next.Errorf("Invalid tokenizer 1 %s", next.Val) } tokenizerType, ok := types.TypeForName(tokenizer.Type()) x.AssertTrue(ok) // Type is validated during tokenizer loading. diff --git a/schema/schema.go b/schema/schema.go index 0069f728c2a..6d5b12f977b 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -473,6 +473,11 @@ func (s *state) PredicatesToDelete(pred string) []string { preds = append(preds, pred+hnsw.VecEntry) preds = append(preds, pred+hnsw.VecKeyword) preds = append(preds, pred+hnsw.VecDead) + for i := range 1000 { + preds = append(preds, fmt.Sprintf("%s%s_%d", pred, hnsw.VecEntry, i)) + preds = append(preds, fmt.Sprintf("%s%s_%d", pred, hnsw.VecKeyword, i)) + preds = append(preds, fmt.Sprintf("%s%s_%d", pred, hnsw.VecDead, i)) + } } } return preds diff --git a/tok/hnsw/helper.go b/tok/hnsw/helper.go index 477f5bc9b27..d57c8b1b5b7 100644 --- a/tok/hnsw/helper.go +++ b/tok/hnsw/helper.go @@ -114,6 +114,10 @@ func euclideanDistanceSq[T c.Float](a, b []T, floatBits int) (T, error) { return applyDistanceFunction(a, b, floatBits, "euclidean distance", vek32.Distance, vek.Distance) } +func EuclideanDistanceSq[T c.Float](a, b []T, floatBits int) (T, error) { + return applyDistanceFunction(a, b, floatBits, "euclidean distance", vek32.Distance, vek.Distance) +} + // Used for distance, since shorter distance is better func insortPersistentHeapAscending[T c.Float]( slice []minPersistentHeapElement[T], @@ -229,67 +233,6 @@ func GetSimType[T c.Float](indexType string, floatBits int) SimilarityType[T] { } } -// TxnCache implements CacheType interface -type TxnCache struct { - txn index.Txn - startTs uint64 -} - -func (tc *TxnCache) Get(key []byte) (rval []byte, rerr error) { - return tc.txn.Get(key) -} - -func (tc *TxnCache) Ts() uint64 { - return tc.startTs -} - -func (tc *TxnCache) Find(prefix []byte, filter func([]byte) bool) (uint64, error) { - return tc.txn.Find(prefix, filter) -} - -func NewTxnCache(txn index.Txn, startTs uint64) *TxnCache { - return &TxnCache{ - txn: txn, - startTs: startTs, - } -} - -// QueryCache implements index.CacheType interface -type QueryCache struct { - cache index.LocalCache - readTs uint64 -} - -func (qc *QueryCache) Find(prefix []byte, filter func([]byte) bool) (uint64, error) { - return qc.cache.Find(prefix, filter) -} - -func (qc *QueryCache) Get(key []byte) (rval []byte, rerr error) { - return qc.cache.Get(key) -} - -func (qc *QueryCache) Ts() uint64 { - return qc.readTs -} - -func NewQueryCache(cache index.LocalCache, readTs uint64) *QueryCache { - return &QueryCache{ - cache: cache, - readTs: readTs, - } -} - -// getDataFromKeyWithCacheType(keyString, uid, c) looks up data in c -// associated with keyString and uid. -func getDataFromKeyWithCacheType(keyString string, uid uint64, c index.CacheType) ([]byte, error) { - key := DataKey(keyString, uid) - data, err := c.Get(key) - if err != nil { - return nil, fmt.Errorf("%w: %w; %s", err, errFetchingPostingList, keyString+" with uid "+strconv.FormatUint(uid, 10)) - } - return data, nil -} - // populateEdgeDataFromStore(keyString, uid, c, edgeData) // will fill edgeData with the contents of the neighboring edges for // a given DataKey by looking into the given cache (which may result @@ -302,34 +245,22 @@ func populateEdgeDataFromKeyWithCacheType( uid uint64, c index.CacheType, edgeData *[][]uint64) (bool, error) { - data, err := getDataFromKeyWithCacheType(keyString, uid, c) - // Note that posting list fetching errors are treated as just not having - // found the data -- no harm, no foul, as it is probably a - // dead reference that we can ignore. - if err != nil && !errors.Is(err, errFetchingPostingList) { - return false, err - } + data := c.GetEdge(uid) if data == nil { return false, nil } - err = decodeUint64MatrixUnsafe(data, edgeData) + err := decodeUint64MatrixUnsafe(*data, edgeData) return true, err } // entryUuidInsert adds the entry uuid to the given key func entryUuidInsert( ctx context.Context, - key []byte, - txn index.Txn, + c index.CacheType, predEntryKey string, - entryUuid []byte) (*index.KeyValue, error) { - edge := &index.KeyValue{ - Entity: 1, - Attr: predEntryKey, - Value: entryUuid, - } - err := txn.AddMutationWithLockHeld(ctx, key, edge) - return edge, err + entryUuid []byte) error { + c.SetOther(predEntryKey, &entryUuid) + return nil } func ConcatStrings(strs ...string) string { @@ -359,22 +290,14 @@ var emptyVec = []byte{} // adds the data corresponding to a uid to the given vec variable in the form of []T // this does not allocate memory for vec, so it must be allocated before calling this function func (ph *persistentHNSW[T]) getVecFromUid(uid uint64, c index.CacheType, vec *[]T) error { - data, err := getDataFromKeyWithCacheType(ph.pred, uid, c) - if err != nil { - if errors.Is(err, errFetchingPostingList) { - // no vector. Return empty array of floats - index.BytesAsFloatArray(emptyVec, vec, ph.floatBits) - return fmt.Errorf("%w; %w", errNilVector, err) - } - return err - } - if data != nil { - index.BytesAsFloatArray(data, vec, ph.floatBits) - return nil - } else { + data := c.GetVector(uid) + if data == nil { + // no vector. Return empty array of floats index.BytesAsFloatArray(emptyVec, vec, ph.floatBits) - return errNilVector + return fmt.Errorf("%w; %w", errNilVector, errFetchingPostingList) } + index.BytesAsFloatArray(*data, vec, ph.floatBits) + return nil } // chooses whether to create the entry and start nodes based on if it already @@ -382,24 +305,19 @@ func (ph *persistentHNSW[T]) getVecFromUid(uid uint64, c index.CacheType, vec *[ // levels. func (ph *persistentHNSW[T]) createEntryAndStartNodes( ctx context.Context, - c *TxnCache, + c index.CacheType, inUuid uint64, - vec *[]T) (uint64, []*index.KeyValue, error) { - txn := c.txn - edges := []*index.KeyValue{} - entryKey := DataKey(ph.vecEntryKey, 1) // 0-profile_vector_entry - txn.LockKey(entryKey) - defer txn.UnlockKey(entryKey) - data, _ := txn.GetWithLockHeld(entryKey) - - create_edges := func(inUuid uint64) (uint64, []*index.KeyValue, error) { - startEdges, err := ph.addStartNodeToAllLevels(ctx, entryKey, txn, inUuid) + vec *[]T) (uint64, error) { + + data := c.GetOther(ph.vecEntryKey) + + create_edges := func(inUuid uint64) (uint64, error) { + err := ph.addStartNodeToAllLevels(ctx, c, inUuid) if err != nil { - return 0, []*index.KeyValue{}, err + return 0, err } // return entry node at all levels - edges = append(edges, startEdges...) - return 0, edges, nil + return 0, nil } if data == nil { @@ -407,7 +325,7 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes( return create_edges(inUuid) } - entry := BytesToUint64(data) // convert entry Uuid returned from Get to uint64 + entry := BytesToUint64(*data) // convert entry Uuid returned from Get to uint64 err := ph.getVecFromUid(entry, c, vec) if err != nil || len(*vec) == 0 { // The entry vector has been deleted. We have to create a new entry vector. @@ -419,7 +337,7 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes( return create_edges(entry) } - return entry, edges, nil + return entry, nil } // Converts the matrix into linear array that looks like @@ -489,47 +407,33 @@ func decodeUint64MatrixUnsafe(data []byte, matrix *[][]uint64) error { // adds empty layers to all levels func (ph *persistentHNSW[T]) addStartNodeToAllLevels( ctx context.Context, - entryKey []byte, - txn index.Txn, - inUuid uint64) ([]*index.KeyValue, error) { - edges := []*index.KeyValue{} - key := DataKey(ph.vecKey, inUuid) + c index.CacheType, + inUuid uint64) error { emptyEdgesBytes := encodeUint64MatrixUnsafe(make([][]uint64, ph.maxLevels)) // creates empty at all levels only for entry node - edge, err := ph.newPersistentEdgeKeyValueEntry(ctx, key, txn, inUuid, emptyEdgesBytes) + err := ph.newPersistentEdgeKeyValueEntry(ctx, c, inUuid, emptyEdgesBytes) if err != nil { - return []*index.KeyValue{}, err + return err } - edges = append(edges, edge) inUuidByte := Uint64ToBytes(inUuid) // add inUuid as entry for this structure from now on - edge, err = entryUuidInsert(ctx, entryKey, txn, ph.vecEntryKey, inUuidByte) + err = entryUuidInsert(ctx, c, ph.vecEntryKey, inUuidByte) if err != nil { - return []*index.KeyValue{}, err + return err } - edges = append(edges, edge) - return edges, nil + return nil } // creates a new edge with the given uuid and edges. Lock must be held before calling this function -func (ph *persistentHNSW[T]) newPersistentEdgeKeyValueEntry(ctx context.Context, key []byte, - txn index.Txn, uuid uint64, edges []byte) (*index.KeyValue, error) { - txn.LockKey(key) - defer txn.UnlockKey(key) - edge := &index.KeyValue{ - Entity: uuid, - Attr: ph.vecKey, - Value: edges, - } - if err := txn.AddMutationWithLockHeld(ctx, key, edge); err != nil { - return nil, err - } - return edge, nil +func (ph *persistentHNSW[T]) newPersistentEdgeKeyValueEntry(ctx context.Context, + c index.CacheType, uuid uint64, edges []byte) error { + c.SetEdge(uuid, &edges) + return nil } -func (ph *persistentHNSW[T]) distance_betw(ctx context.Context, tc *TxnCache, inUuid, outUuid uint64, inVec, +func (ph *persistentHNSW[T]) distance_betw(ctx context.Context, c index.CacheType, inUuid, outUuid uint64, inVec, outVec *[]T) T { - err := ph.getVecFromUid(outUuid, tc, outVec) + err := ph.getVecFromUid(outUuid, c, outVec) if err != nil { log.Printf("[ERROR] While getting vector %s", err) return -1 @@ -579,87 +483,67 @@ func (h *HeapDataHolder) Pop() interface{} { // addNeighbors adds the neighbors of the given uuid to the given level. // It returns the edge created and the error if any. -func (ph *persistentHNSW[T]) addNeighbors(ctx context.Context, tc *TxnCache, - uuid uint64, allLayerNeighbors [][]uint64) (*index.KeyValue, error) { - - txn := tc.txn - keyPred := ph.vecKey - key := DataKey(keyPred, uuid) - txn.LockKey(key) - defer txn.UnlockKey(key) - var nnEdgesErr error +func (ph *persistentHNSW[T]) addNeighbors(ctx context.Context, c index.CacheType, + uuid uint64, allLayerNeighbors [][]uint64) error { + + // Lock the vector key + edges_data := c.GetEdge(uuid) var allLayerEdges [][]uint64 - var ok bool - allLayerEdges, ok = ph.nodeAllEdges[uuid] - if !ok { - data, _ := txn.GetWithLockHeld(key) - if data == nil { - allLayerEdges = allLayerNeighbors - } else { - // all edges of nearest neighbor - err := decodeUint64MatrixUnsafe(data, &allLayerEdges) - if err != nil { - return nil, err - } - } + if edges_data != nil { + decodeUint64MatrixUnsafe(*edges_data, &allLayerEdges) + } else { + encodedData := encodeUint64MatrixUnsafe(allLayerNeighbors) + c.SetEdge(uuid, &encodedData) + return nil } + var inVec, outVec []T for level := range ph.maxLevels { - allLayerEdges[level], nnEdgesErr = ph.removeDeadNodes(allLayerEdges[level], tc) - if nnEdgesErr != nil { - return nil, nnEdgesErr + var err error + allLayerEdges[level], err = ph.removeDeadNodes(allLayerEdges[level], c) + if err != nil { + return err } // This adds at most efConstruction number of edges for each layer for this node allLayerEdges[level] = append(allLayerEdges[level], allLayerNeighbors[level]...) if len(allLayerEdges[level]) > ph.efConstruction { - err := ph.getVecFromUid(uuid, tc, &inVec) + err := ph.getVecFromUid(uuid, c, &inVec) if err != nil { log.Printf("[ERROR] While getting vector %s", err) } else { h := &HeapDataHolder{ data: allLayerEdges[level], compare: func(i, j uint64) bool { - return ph.distance_betw(ctx, tc, uuid, i, &inVec, &outVec) > - ph.distance_betw(ctx, tc, uuid, j, &inVec, &outVec) + return ph.distance_betw(ctx, c, uuid, i, &inVec, &outVec) > + ph.distance_betw(ctx, c, uuid, j, &inVec, &outVec) }} for _, e := range allLayerNeighbors[level] { heap.Push(h, e) - heap.Pop(h) } } allLayerEdges[level] = allLayerEdges[level][:ph.efConstruction] } } - // on every modification of the layer edges, add it to in mem map so you dont have to always be reading - // from persistent storage - ph.nodeAllEdges[uuid] = allLayerEdges inboundEdgesBytes := encodeUint64MatrixUnsafe(allLayerEdges) - - edge := &index.KeyValue{ - Entity: uuid, - Attr: ph.vecKey, - Value: inboundEdgesBytes, - } - if err := txn.AddMutationWithLockHeld(ctx, key, edge); err != nil { - return nil, err - } - return edge, nil + c.SetEdge(uuid, &inboundEdgesBytes) + return nil } // removeDeadNodes(nnEdges, tc) removes dead nodes from nnEdges and returns the new nnEdges -func (ph *persistentHNSW[T]) removeDeadNodes(nnEdges []uint64, tc *TxnCache) ([]uint64, error) { +func (ph *persistentHNSW[T]) removeDeadNodes(nnEdges []uint64, c index.CacheType) ([]uint64, error) { // TODO add a path to delete deadNodes if ph.deadNodes == nil { - data, err := getDataFromKeyWithCacheType(ph.vecDead, 1, tc) - if err != nil && !errors.Is(err, errFetchingPostingList) { - return []uint64{}, err + data := c.GetOther(ph.vecDead) + if data == nil { + return nnEdges, nil } var deadNodes []uint64 if data != nil { // if dead nodes exist, convert to []uint64 - deadNodes, err = ParseEdges(string(data)) + var err error + deadNodes, err = ParseEdges(string(*data)) if err != nil { return []uint64{}, err } diff --git a/tok/hnsw/persistent_factory.go b/tok/hnsw/persistent_factory.go index ff4c622f218..5704a8e511b 100644 --- a/tok/hnsw/persistent_factory.go +++ b/tok/hnsw/persistent_factory.go @@ -78,6 +78,17 @@ func (hf *persistentIndexFactory[T]) AllowedOptions() opt.AllowedOptions { return retVal } +func UpdateIndexSplit[T c.Float](vi index.VectorIndex[T], split int) error { + hnsw, ok := vi.(*persistentHNSW[T]) + if !ok { + return errors.New("index is not a persistent HNSW index") + } + hnsw.vecEntryKey = ConcatStrings(hnsw.pred, fmt.Sprintf("%s_%d", VecEntry, split)) + hnsw.vecKey = ConcatStrings(hnsw.pred, fmt.Sprintf("%s_%d", VecKeyword, split)) + hnsw.vecDead = ConcatStrings(hnsw.pred, fmt.Sprintf("%s_%d", VecDead, split)) + return nil +} + // Create is an implementation of the IndexFactory interface function, invoked by an HNSWIndexFactory // instance. It takes in a string name and a VectorSource implementation, and returns a VectorIndex and error // flag. It creates an HNSW instance using the index name and populates other parts of the HNSW struct such as @@ -102,12 +113,11 @@ func (hf *persistentIndexFactory[T]) createWithLock( return nil, err } retVal := &persistentHNSW[T]{ - pred: name, - vecEntryKey: ConcatStrings(name, VecEntry), - vecKey: ConcatStrings(name, VecKeyword), - vecDead: ConcatStrings(name, VecDead), - floatBits: floatBits, - nodeAllEdges: map[uint64][][]uint64{}, + pred: name, + vecEntryKey: ConcatStrings(name, VecEntry), + vecKey: ConcatStrings(name, VecKeyword), + vecDead: ConcatStrings(name, VecDead), + floatBits: floatBits, } err := retVal.applyOptions(o) if err != nil { diff --git a/tok/hnsw/persistent_hnsw.go b/tok/hnsw/persistent_hnsw.go index e13ddddaf89..af9f17068ec 100644 --- a/tok/hnsw/persistent_hnsw.go +++ b/tok/hnsw/persistent_hnsw.go @@ -8,13 +8,17 @@ package hnsw import ( "context" "fmt" + "sort" "strings" + "sync/atomic" "time" + "unsafe" "github.com/golang/glog" c "github.com/hypermodeinc/dgraph/v25/tok/constraints" "github.com/hypermodeinc/dgraph/v25/tok/index" opt "github.com/hypermodeinc/dgraph/v25/tok/options" + "github.com/hypermodeinc/dgraph/v25/types" "github.com/pkg/errors" ) @@ -30,8 +34,11 @@ type persistentHNSW[T c.Float] struct { floatBits int // nodeAllEdges[65443][1][3] indicates the 3rd neighbor in the first // layer for uuid 65443. The result will be a neighboring uuid. - nodeAllEdges map[uint64][][]uint64 - deadNodes map[uint64]struct{} + deadNodes map[uint64]struct{} + cache index.CacheType + + entryVec []T + isEntryVecSet atomic.Bool } func GetPersistantOptions[T c.Float](o opt.Options) string { @@ -111,6 +118,56 @@ func (ph *persistentHNSW[T]) applyOptions(o opt.Options) error { return nil } +func (ph *persistentHNSW[T]) NumBuildPasses() int { + return 0 +} + +func (ph *persistentHNSW[T]) NumIndexPasses() int { + return 1 +} + +func (ph *persistentHNSW[T]) NumSeedVectors() int { + return 0 +} + +func (ph *persistentHNSW[T]) StartBuild() { +} + +func (ph *persistentHNSW[T]) SetCaches(c []index.CacheType) { + ph.cache = c[0] +} + +func (ph *persistentHNSW[T]) EndBuild() []int { + ph.cache = nil + return []int{0} +} + +func (ph *persistentHNSW[T]) NumThreads() int { + return 1 +} + +func (ph *persistentHNSW[T]) BuildInsert(ctx context.Context, uid uint64, vec []T) error { + floatVec := *(*[]float32)(unsafe.Pointer(&vec)) + vecBytes := types.FloatArrayAsBytes(floatVec) + ph.cache.SetVector(uid, &vecBytes) + newPh := &persistentHNSW[T]{ + maxLevels: ph.maxLevels, + efConstruction: ph.efConstruction, + efSearch: ph.efSearch, + pred: ph.pred, + vecEntryKey: ph.vecEntryKey, + vecKey: ph.vecKey, + vecDead: ph.vecDead, + simType: ph.simType, + floatBits: ph.floatBits, + cache: ph.cache, + } + return newPh.Insert(ctx, uid, vec) +} + +func (ph *persistentHNSW[T]) AddSeedVector(vec []T) { +} + func (ph *persistentHNSW[T]) emptyFinalResultWithError(e error) ( *index.SearchPathResult, error) { return index.NewSearchPathResult(), e @@ -126,22 +183,13 @@ func (ph *persistentHNSW[T]) emptySearchResultWithError(e error) (*searchLayerRe // in persistent store) and false otherwise. // (Of course, it may also return an error if a problem was encountered). func (ph *persistentHNSW[T]) fillNeighborEdges(uuid uint64, c index.CacheType, edges *[][]uint64) (bool, error) { - var ok bool - *edges, ok = ph.nodeAllEdges[uuid] - if ok { - return true, nil + edge := ph.cache.GetEdge(uuid) + if edge == nil { + return false, nil } - - ok, err := populateEdgeDataFromKeyWithCacheType(ph.vecKey, uuid, c, edges) - if err != nil { + if err := decodeUint64MatrixUnsafe(*edge, edges); err != nil { return false, err } - if !ok { - return false, nil - } - - // add this to in mem storage of uid -> edges - ph.nodeAllEdges[uuid] = *edges return true, nil } @@ -248,12 +296,52 @@ func (ph *persistentHNSW[T]) searchPersistentLayer( // Search searches the hnsw graph for the nearest neighbors of the query vector // and returns the traversal path and the nearest neighbors -func (ph *persistentHNSW[T]) Search(ctx context.Context, c index.CacheType, query []T, +func (ph *persistentHNSW[T]) Search(ctx context.Context, query []T, maxResults int, filter index.SearchFilter[T]) (nnUids []uint64, err error) { - r, err := ph.SearchWithPath(ctx, c, query, maxResults, filter) + r, err := ph.SearchWithPath(ctx, ph.cache, query, maxResults, filter) return r.Neighbors, err } +type resultRow[T c.Float] struct { + uid uint64 + dist T +} + +func (ph *persistentHNSW[T]) MergeResults(ctx context.Context, c index.CacheType, list []uint64, query []T, maxResults int, filter index.SearchFilter[T]) ([]uint64, error) { + var result []resultRow[T] + + for i := range list { + var vec []T + err := ph.getVecFromUid(list[i], c, &vec) + if err != nil { + return nil, err + } + + dist, err := ph.simType.distanceScore(vec, query, ph.floatBits) + if err != nil { + return nil, err + } + result = append(result, resultRow[T]{ + uid: list[i], + dist: dist, + }) + } + + sort.Slice(result, func(i, j int) bool { + return result[i].dist < result[j].dist + }) + + uids := []uint64{} + for i := range maxResults { + if i > len(result) { + break + } + uids = append(uids, result[i].uid) + } + + return uids, nil +} + // SearchWithUid searches the hnsw graph for the nearest neighbors of the query uid // and returns the traversal path and the nearest neighbors func (ph *persistentHNSW[T]) SearchWithUid(_ context.Context, c index.CacheType, queryUid uint64, @@ -315,24 +403,20 @@ func (ph *persistentHNSW[T]) PickStartNode( c index.CacheType, startVec *[]T) (uint64, error) { - data, err := getDataFromKeyWithCacheType(ph.vecEntryKey, 1, c) - if err != nil { - if errors.Is(err, errFetchingPostingList) { - // The index might be empty - return ph.calculateNewEntryVec(ctx, c, startVec) - } - return 0, err + data := c.GetOther(ph.vecEntryKey) + if data == nil { + return ph.calculateNewEntryVec(ctx, c, startVec) } - entry := BytesToUint64(data) - if err = ph.getVecFromUid(entry, c, startVec); err != nil && !errors.Is(err, errNilVector) { + entry := BytesToUint64(*data) + if err := ph.getVecFromUid(entry, c, startVec); err != nil && !errors.Is(err, errNilVector) { return 0, err } if len(*startVec) == 0 { return ph.calculateNewEntryVec(ctx, c, startVec) } - return entry, err + return entry, nil } // SearchWithPath allows persistentHNSW to implement index.OptionalIndexSupport. @@ -391,26 +475,22 @@ func (ph *persistentHNSW[T]) SearchWithPath( // InsertToPersistentStorage inserts a node into the hnsw graph and returns the // traversal path and the edges created -func (ph *persistentHNSW[T]) Insert(ctx context.Context, c index.CacheType, - inUuid uint64, inVec []T) ([]*index.KeyValue, error) { - tc, ok := c.(*TxnCache) - if !ok { - return []*index.KeyValue{}, nil - } - _, edges, err := ph.insertHelper(ctx, tc, inUuid, inVec) - return edges, err +func (ph *persistentHNSW[T]) Insert(ctx context.Context, + inUuid uint64, inVec []T) error { + _, err := ph.insertHelper(ctx, ph.cache, inUuid, inVec) + return err } // InsertToPersistentStorage inserts a node into the hnsw graph and returns the // traversal path and the edges created -func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, - inUuid uint64, inVec []T) ([]minPersistentHeapElement[T], []*index.KeyValue, error) { +func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, c index.CacheType, + inUuid uint64, inVec []T) ([]minPersistentHeapElement[T], error) { // return all the new edges created at all HNSW levels var startVec []T - entry, edges, err := ph.createEntryAndStartNodes(ctx, tc, inUuid, &startVec) - if err != nil || len(edges) > 0 { - return []minPersistentHeapElement[T]{}, edges, err + entry, err := ph.createEntryAndStartNodes(ctx, c, inUuid, &startVec) + if err != nil { + return []minPersistentHeapElement[T]{}, err } if entry == inUuid { @@ -418,7 +498,7 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, // it'll just overwrite w the same info // only situation where you can add duplicate nodes is if your // mutation adds the same node as entry - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, nil + return []minPersistentHeapElement[T]{}, nil } // startVecs: vectors used to calc where to start up until inLevel, @@ -427,40 +507,33 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, // var nns []minPersistentHeapElement[T] visited := []minPersistentHeapElement[T]{} inLevel := getInsertLayer(ph.maxLevels) // calculate layer to insert node at (randomized every time) - var layerErr error for level := range inLevel { // perform insertion for layers [level, max_level) only, when level < inLevel just find better start - err := ph.getVecFromUid(entry, tc, &startVec) + err := ph.getVecFromUid(entry, c, &startVec) if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err + return []minPersistentHeapElement[T]{}, err } - layerResult, err := ph.searchPersistentLayer(tc, level, entry, startVec, + layerResult, err := ph.searchPersistentLayer(c, level, entry, startVec, inVec, false, ph.efSearch, index.AcceptAll[T]) if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err + return []minPersistentHeapElement[T]{}, err } entry = layerResult.bestNeighbor().index } - emptyEdges := make([][]uint64, ph.maxLevels) - _, err = ph.addNeighbors(ctx, tc, inUuid, emptyEdges) - if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err - } - var outboundEdgesAllLayers = make([][]uint64, ph.maxLevels) var inboundEdgesAllLayersMap = make(map[uint64][][]uint64) nnUidArray := []uint64{} for level := inLevel; level < ph.maxLevels; level++ { - err := ph.getVecFromUid(entry, tc, &startVec) + err := ph.getVecFromUid(entry, c, &startVec) if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err + return []minPersistentHeapElement[T]{}, err } - layerResult, err := ph.searchPersistentLayer(tc, level, entry, startVec, + layerResult, err := ph.searchPersistentLayer(c, level, entry, startVec, inVec, false, ph.efConstruction, index.AcceptAll[T]) if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, layerErr + return []minPersistentHeapElement[T]{}, err } entry = layerResult.bestNeighbor().index @@ -479,19 +552,17 @@ func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache, append(outboundEdgesAllLayers[level], nns[i].index) } } - edge, err := ph.addNeighbors(ctx, tc, inUuid, outboundEdgesAllLayers) + err = ph.addNeighbors(ctx, c, inUuid, outboundEdgesAllLayers) + if err != nil { + return []minPersistentHeapElement[T]{}, err + } for i := range nnUidArray { - edge, err := ph.addNeighbors( - ctx, tc, nnUidArray[i], inboundEdgesAllLayersMap[nnUidArray[i]]) + err = ph.addNeighbors( + ctx, c, nnUidArray[i], inboundEdgesAllLayersMap[nnUidArray[i]]) if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err + return []minPersistentHeapElement[T]{}, err } - edges = append(edges, edge) - } - if err != nil { - return []minPersistentHeapElement[T]{}, []*index.KeyValue{}, err } - edges = append(edges, edge) - return visited, edges, nil + return visited, nil } diff --git a/tok/index/index.go b/tok/index/index.go index e0a62255ce1..c4836094a2f 100644 --- a/tok/index/index.go +++ b/tok/index/index.go @@ -89,17 +89,31 @@ type OptionalIndexSupport[T c.Float] interface { filter SearchFilter[T]) (*SearchPathResult, error) } +type VectorPartitionStrat[T c.Float] interface { + FindIndexForSearch(vec []T) ([]int, error) + FindIndexForInsert(vec []T) (int, error) + NumPasses() int + NumSeedVectors() int + StartBuildPass() + EndBuildPass() + AddSeedVector(vec []T) + AddVector(vec []T) error +} + // A VectorIndex can be used to Search for vectors and add vectors to an index. type VectorIndex[T c.Float] interface { OptionalIndexSupport[T] + MergeResults(ctx context.Context, c CacheType, list []uint64, query []T, maxResults int, + filter SearchFilter[T]) ([]uint64, error) + // Search will find the uids for a given set of vectors based on the // input query, limiting to the specified maximum number of results. // The filter parameter indicates that we might discard certain parameters // based on some input criteria. The maxResults count is counted *after* // being filtered. In other words, we only count those results that had not // been filtered out. - Search(ctx context.Context, c CacheType, query []T, + Search(ctx context.Context, query []T, maxResults int, filter SearchFilter[T]) ([]uint64, error) @@ -115,24 +129,32 @@ type VectorIndex[T c.Float] interface { // Insert will add a vector and uuid into the existing VectorIndex. If // uuid already exists, it should throw an error to not insert duplicate uuids - Insert(ctx context.Context, c CacheType, uuid uint64, vec []T) ([]*KeyValue, error) + Insert(ctx context.Context, uuid uint64, vec []T) error + + BuildInsert(ctx context.Context, uuid uint64, vec []T) error + AddSeedVector(vec []T) + NumBuildPasses() int + NumIndexPasses() int + NumSeedVectors() int + StartBuild() + EndBuild() []int + NumThreads() int + SetCaches(caches []CacheType) } // A Txn is an interface representation of a persistent storage transaction, // where multiple operations are performed on a database type Txn interface { - // StartTs gets the exact time that the transaction started, returned in uint64 format - StartTs() uint64 // Get uses a []byte key to return the Value corresponding to the key - Get(key []byte) (rval []byte, rerr error) + // Get(key []byte) (rval []byte, rerr error) // GetWithLockHeld uses a []byte key to return the Value corresponding to the key with a mutex lock held - GetWithLockHeld(key []byte) (rval []byte, rerr error) + // GetWithLockHeld(key []byte) (rval []byte, rerr error) Find(prefix []byte, filter func(val []byte) bool) (uint64, error) // Adds a mutation operation on a index.Txn interface, where the mutation // is represented in the form of an index.DirectedEdge - AddMutation(ctx context.Context, key []byte, t *KeyValue) error + // AddMutation(ctx context.Context, key []byte, t *KeyValue) error // Same as AddMutation but with a mutex lock held - AddMutationWithLockHeld(ctx context.Context, key []byte, t *KeyValue) error + // AddMutationWithLockHeld(ctx context.Context, key []byte, t *KeyValue) error // mutex lock LockKey(key []byte) // mutex unlock @@ -150,7 +172,13 @@ type LocalCache interface { // CacheType is an interface representation of the cache of a persistent storage system type CacheType interface { - Get(key []byte) (rval []byte, rerr error) + // Get(key []byte) (rval []byte, rerr error) Ts() uint64 Find(prefix []byte, filter func(val []byte) bool) (uint64, error) + SetVector(uid uint64, vec *[]byte) + SetEdge(uid uint64, edge *[]byte) + SetOther(key string, val *[]byte) + GetVector(uid uint64) *[]byte + GetEdge(uid uint64) *[]byte + GetOther(key string) *[]byte } diff --git a/tok/kmeans/kmeans.go b/tok/kmeans/kmeans.go new file mode 100644 index 00000000000..6f6693d9300 --- /dev/null +++ b/tok/kmeans/kmeans.go @@ -0,0 +1,135 @@ +package kmeans + +import ( + "fmt" + "math" + "sync" + + c "github.com/hypermodeinc/dgraph/v25/tok/constraints" + "github.com/hypermodeinc/dgraph/v25/tok/index" +) + +type Kmeans[T c.Float] struct { + floatBits int + centroids *vectorCentroids[T] +} + +func CreateKMeans[T c.Float](floatBits int, distFunc func(a, b []T, floatBits int) (T, error)) index.VectorPartitionStrat[T] { + return &Kmeans[T]{ + floatBits: floatBits, + centroids: &vectorCentroids[T]{ + distFunc: distFunc, + floatBits: floatBits, + }, + } +} + +func (km *Kmeans[T]) AddSeedVector(vec []T) { + km.centroids.addSeedCentroid(vec) +} + +func (km *Kmeans[T]) AddVector(vec []T) error { + return km.centroids.addVector(vec) +} + +func (km *Kmeans[T]) FindIndexForSearch(vec []T) ([]int, error) { + res := make([]int, km.NumSeedVectors()) + for i := range res { + res[i] = i + } + return res, nil +} + +func (km *Kmeans[T]) FindIndexForInsert(vec []T) (int, error) { + return km.centroids.findCentroid(vec) +} + +func (km *Kmeans[T]) NumPasses() int { + return 5 +} + +func (km *Kmeans[T]) NumSeedVectors() int { + return 1000 +} + +func (km *Kmeans[T]) StartBuildPass() { + if km.centroids.weights == nil { + km.centroids.randomInit() + } +} + +func (km *Kmeans[T]) EndBuildPass() { + km.centroids.updateCentroids() +} + +type vectorCentroids[T c.Float] struct { + dimension int + numCenters int + + distFunc func(a, b []T, floatBits int) (T, error) + + centroids [][]T + counts []int64 + weights [][]T + mutexs []*sync.Mutex + floatBits int +} + +func (vc *vectorCentroids[T]) findCentroid(input []T) (int, error) { + minIdx := 0 + minDist := math.MaxFloat32 + for i, centroid := range vc.centroids { + dist, err := vc.distFunc(centroid, input, vc.floatBits) + if err != nil { + return 0, err + } + if float64(dist) < minDist { + minDist = float64(dist) + minIdx = i + } + } + return minIdx, nil +} + +func (vc *vectorCentroids[T]) addVector(vec []T) error { + idx, err := vc.findCentroid(vec) + if err != nil { + return err + } + vc.mutexs[idx].Lock() + defer vc.mutexs[idx].Unlock() + for i := 0; i < vc.dimension; i++ { + vc.weights[idx][i] += vec[i] + } + vc.counts[idx]++ + return nil +} + +func (vc *vectorCentroids[T]) updateCentroids() { + for i := 0; i < vc.numCenters; i++ { + for j := 0; j < vc.dimension; j++ { + vc.centroids[i][j] = vc.weights[i][j] / T(vc.counts[i]) + vc.weights[i][j] = 0 + } + fmt.Printf("%d, ", vc.counts[i]) + vc.counts[i] = 0 + } + fmt.Println() +} + +func (vc *vectorCentroids[T]) randomInit() { + vc.dimension = len(vc.centroids[0]) + vc.numCenters = len(vc.centroids) + vc.counts = make([]int64, vc.numCenters) + vc.weights = make([][]T, vc.numCenters) + vc.mutexs = make([]*sync.Mutex, vc.numCenters) + for i := 0; i < vc.numCenters; i++ { + vc.weights[i] = make([]T, vc.dimension) + vc.counts[i] = 0 + vc.mutexs[i] = &sync.Mutex{} + } +} + +func (vc *vectorCentroids[T]) addSeedCentroid(vec []T) { + vc.centroids = append(vc.centroids, vec) +} diff --git a/tok/partitioned_hnsw/partitioned_factory.go b/tok/partitioned_hnsw/partitioned_factory.go new file mode 100644 index 00000000000..8c925ca003a --- /dev/null +++ b/tok/partitioned_hnsw/partitioned_factory.go @@ -0,0 +1,161 @@ +/* + * SPDX-FileCopyrightText: © Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package partitioned_hnsw + +import ( + "errors" + "fmt" + "sync" + + c "github.com/hypermodeinc/dgraph/v25/tok/constraints" + "github.com/hypermodeinc/dgraph/v25/tok/hnsw" + "github.com/hypermodeinc/dgraph/v25/tok/index" + opt "github.com/hypermodeinc/dgraph/v25/tok/options" +) + +const ( + NumClustersOpt string = "numClusters" + PartitionStratOpt string = "partitionStratOpt" + PartitionedHNSW string = "partionedhnsw" +) + +type partitionedHNSWIndexFactory[T c.Float] struct { + indexMap map[string]index.VectorIndex[T] + floatBits int + mu sync.RWMutex +} + +// CreateFactory creates an instance of the private struct persistentIndexFactory. +// NOTE: if T and floatBits do not match in # of bits, there will be consequences. +func CreateFactory[T c.Float](floatBits int) index.IndexFactory[T] { + return &partitionedHNSWIndexFactory[T]{ + indexMap: map[string]index.VectorIndex[T]{}, + floatBits: floatBits, + } +} + +// Implements NamedFactory interface for use as a plugin. +func (hf *partitionedHNSWIndexFactory[T]) Name() string { return PartitionedHNSW } + +func (hf *partitionedHNSWIndexFactory[T]) GetOptions(o opt.Options) string { + return hnsw.GetPersistantOptions[T](o) +} + +func (hf *partitionedHNSWIndexFactory[T]) isNameAvailableWithLock(name string) bool { + _, nameUsed := hf.indexMap[name] + return !nameUsed +} + +// hf.AllowedOptions() allows persistentIndexFactory to implement the +// IndexFactory interface (see vector-indexer/index/index.go for details). +// We define here options for exponent, maxLevels, efSearch, efConstruction, +// and metric. +func (hf *partitionedHNSWIndexFactory[T]) AllowedOptions() opt.AllowedOptions { + retVal := opt.NewAllowedOptions() + retVal.AddIntOption(hnsw.ExponentOpt). + AddIntOption(hnsw.MaxLevelsOpt). + AddIntOption(hnsw.EfConstructionOpt). + AddIntOption(hnsw.EfSearchOpt). + AddIntOption(NumClustersOpt). + AddStringOption(PartitionStratOpt) + getSimFunc := func(optValue string) (any, error) { + if optValue != hnsw.Euclidean && optValue != hnsw.Cosine && optValue != hnsw.DotProd { + return nil, errors.New(fmt.Sprintf("Can't create a vector index for %s", optValue)) + } + return hnsw.GetSimType[T](optValue, hf.floatBits), nil + } + + retVal.AddCustomOption(hnsw.MetricOpt, getSimFunc) + return retVal +} + +// Create is an implementation of the IndexFactory interface function, invoked by an HNSWIndexFactory +// instance. It takes in a string name and a VectorSource implementation, and returns a VectorIndex and error +// flag. It creates an HNSW instance using the index name and populates other parts of the HNSW struct such as +// multFactor, maxLevels, efConstruction, maxNeighbors, and efSearch using struct parameters. +// It then populates the HNSW graphs using the InsertChunk function until there are no more items to populate. +// Finally, the function adds the name and hnsw object to the in memory map and returns the object. +func (hf *partitionedHNSWIndexFactory[T]) Create( + name string, + o opt.Options, + floatBits int) (index.VectorIndex[T], error) { + hf.mu.Lock() + defer hf.mu.Unlock() + return hf.createWithLock(name, o, floatBits) +} + +func (hf *partitionedHNSWIndexFactory[T]) createWithLock( + name string, + o opt.Options, + floatBits int) (index.VectorIndex[T], error) { + if !hf.isNameAvailableWithLock(name) { + err := errors.New("index with name " + name + " already exists") + return nil, err + } + retVal := &partitionedHNSW[T]{ + pred: name, + floatBits: floatBits, + clusterMap: map[int]index.VectorIndex[T]{}, + buildSyncMaps: map[int]*sync.Mutex{}, + } + err := retVal.applyOptions(o) + if err != nil { + return nil, err + } + hf.indexMap[name] = retVal + return retVal, nil +} + +// Find is an implementation of the IndexFactory interface function, invoked by an persistentIndexFactory +// instance. It returns the VectorIndex corresponding with a string name using the in memory map. +func (hf *partitionedHNSWIndexFactory[T]) Find(name string) (index.VectorIndex[T], error) { + hf.mu.RLock() + defer hf.mu.RUnlock() + return hf.findWithLock(name) +} + +func (hf *partitionedHNSWIndexFactory[T]) findWithLock(name string) (index.VectorIndex[T], error) { + vecInd := hf.indexMap[name] + return vecInd, nil +} + +// Remove is an implementation of the IndexFactory interface function, invoked by an persistentIndexFactory +// instance. It removes the VectorIndex corresponding with a string name using the in memory map. +func (hf *partitionedHNSWIndexFactory[T]) Remove(name string) error { + hf.mu.Lock() + defer hf.mu.Unlock() + return hf.removeWithLock(name) +} + +func (hf *partitionedHNSWIndexFactory[T]) removeWithLock(name string) error { + delete(hf.indexMap, name) + return nil +} + +// CreateOrReplace is an implementation of the IndexFactory interface funciton, +// invoked by an persistentIndexFactory. It checks if a VectorIndex +// correpsonding with name exists. If it does, it removes it, and replaces it +// via the Create function using the passed VectorSource. If the VectorIndex +// does not exist, it creates that VectorIndex corresponding with the name using +// the VectorSource. +func (hf *partitionedHNSWIndexFactory[T]) CreateOrReplace( + name string, + o opt.Options, + floatBits int) (index.VectorIndex[T], error) { + hf.mu.Lock() + defer hf.mu.Unlock() + vi, err := hf.findWithLock(name) + if err != nil { + return nil, err + } + if vi != nil { + err = hf.removeWithLock(name) + if err != nil { + return nil, err + } + } + return hf.createWithLock(name, o, floatBits) +} diff --git a/tok/partitioned_hnsw/partitioned_hnsw.go b/tok/partitioned_hnsw/partitioned_hnsw.go new file mode 100644 index 00000000000..fa430dedfd3 --- /dev/null +++ b/tok/partitioned_hnsw/partitioned_hnsw.go @@ -0,0 +1,197 @@ +// CreateFactory creates an instance of the private struct persistentIndexFactory. +// NOTE: if T and floatBits do not match in # of bits, there will be consequences. + +package partitioned_hnsw + +import ( + "context" + "errors" + "sync" + + c "github.com/hypermodeinc/dgraph/v25/tok/constraints" + hnsw "github.com/hypermodeinc/dgraph/v25/tok/hnsw" + "github.com/hypermodeinc/dgraph/v25/tok/index" + "github.com/hypermodeinc/dgraph/v25/tok/kmeans" + opt "github.com/hypermodeinc/dgraph/v25/tok/options" +) + +type partitionedHNSW[T c.Float] struct { + floatBits int + pred string + + clusterMap map[int]index.VectorIndex[T] + numClusters int + partition index.VectorPartitionStrat[T] + + hnswOptions opt.Options + partitionStrat string + + caches []index.CacheType + buildPass int + buildSyncMaps map[int]*sync.Mutex +} + +func (ph *partitionedHNSW[T]) applyOptions(o opt.Options) error { + ph.numClusters, _, _ = opt.GetOpt(o, NumClustersOpt, 1000) + ph.partitionStrat, _, _ = opt.GetOpt(o, PartitionStratOpt, "kmeans") + + if ph.partitionStrat != "kmeans" && ph.partitionStrat != "query" { + return errors.New("partition strategy must be kmeans or query") + } + + if ph.partitionStrat == "kmeans" { + ph.partition = kmeans.CreateKMeans(ph.floatBits, hnsw.EuclideanDistanceSq[T]) + } + + ph.buildPass = 0 + ph.hnswOptions = o + for i := range ph.numClusters { + factory := hnsw.CreateFactory[T](ph.floatBits) + vi, err := factory.Create(ph.pred, ph.hnswOptions, ph.floatBits) + if err != nil { + return err + } + err = hnsw.UpdateIndexSplit(vi, i) + if err != nil { + return err + } + ph.clusterMap[i] = vi + } + return nil +} + +func (ph *partitionedHNSW[T]) AddSeedVector(vec []T) { + ph.partition.AddSeedVector(vec) +} + +func (ph *partitionedHNSW[T]) BuildInsert(ctx context.Context, uuid uint64, vec []T) error { + passIdx := ph.buildPass - ph.partition.NumPasses() + if passIdx < 0 { + return ph.partition.AddVector(vec) + } + index, err := ph.partition.FindIndexForInsert(vec) + if err != nil { + return err + } + if index%NUM_PASSES != passIdx { + return nil + } + ph.buildSyncMaps[index].Lock() + defer ph.buildSyncMaps[index].Unlock() + return ph.clusterMap[index].BuildInsert(ctx, uuid, vec) +} + +const NUM_PASSES = 5 + +func (ph *partitionedHNSW[T]) NumBuildPasses() int { + return ph.partition.NumPasses() +} + +func (ph *partitionedHNSW[T]) NumIndexPasses() int { + return NUM_PASSES +} + +func (ph *partitionedHNSW[T]) NumThreads() int { + return ph.numClusters +} + +func (ph *partitionedHNSW[T]) NumSeedVectors() int { + return ph.partition.NumSeedVectors() +} + +func (ph *partitionedHNSW[T]) SetCaches(caches []index.CacheType) { + ph.caches = caches + for i := range ph.clusterMap { + ph.clusterMap[i].SetCaches([]index.CacheType{ph.caches[i]}) + } +} + +func (ph *partitionedHNSW[T]) StartBuild() { + if ph.buildPass < ph.partition.NumPasses() { + ph.partition.StartBuildPass() + return + } + + for i := range ph.clusterMap { + ph.buildSyncMaps[i] = &sync.Mutex{} + if i%NUM_PASSES != (ph.buildPass - ph.partition.NumPasses()) { + continue + } + ph.clusterMap[i].StartBuild() + } +} + +func (ph *partitionedHNSW[T]) EndBuild() []int { + res := []int{} + + if ph.buildPass >= ph.partition.NumPasses() { + for i := range ph.clusterMap { + if i%NUM_PASSES != (ph.buildPass - ph.partition.NumPasses()) { + continue + } + ph.clusterMap[i].EndBuild() + res = append(res, i) + } + } + + ph.buildPass += 1 + + if len(res) > 0 { + return res + } + + if ph.buildPass < ph.partition.NumPasses() { + ph.partition.EndBuildPass() + } + return []int{} +} + +func (ph *partitionedHNSW[T]) Insert(ctx context.Context, uid uint64, vec []T) error { + index, err := ph.partition.FindIndexForInsert(vec) + if err != nil { + return err + } + return ph.clusterMap[index].Insert(ctx, uid, vec) +} + +func (ph *partitionedHNSW[T]) Search(ctx context.Context, query []T, maxResults int, filter index.SearchFilter[T]) ([]uint64, error) { + indexes, err := ph.partition.FindIndexForSearch(query) + if err != nil { + return nil, err + } + res := []uint64{} + mutex := &sync.Mutex{} + var wg sync.WaitGroup + for _, index := range indexes { + wg.Add(1) + go func(i int) { + defer wg.Done() + ids, err := ph.clusterMap[i].Search(ctx, query, maxResults, filter) + if err != nil { + return + } + mutex.Lock() + res = append(res, ids...) + mutex.Unlock() + }(index) + } + wg.Wait() + return ph.clusterMap[0].MergeResults(ctx, ph.caches[0], res, query, maxResults, filter) +} + +func (ph *partitionedHNSW[T]) SearchWithPath(ctx context.Context, txn index.CacheType, query []T, maxResults int, filter index.SearchFilter[T]) (*index.SearchPathResult, error) { + indexes, err := ph.partition.FindIndexForSearch(query) + if err != nil { + return nil, err + } + return ph.clusterMap[indexes[0]].SearchWithPath(ctx, txn, query, maxResults, filter) +} + +func (ph *partitionedHNSW[T]) SearchWithUid(ctx context.Context, txn index.CacheType, uid uint64, maxResults int, filter index.SearchFilter[T]) ([]uint64, error) { + // #TODO + return ph.clusterMap[0].SearchWithUid(ctx, txn, uid, maxResults, filter) +} + +func (ph *partitionedHNSW[T]) MergeResults(ctx context.Context, txn index.CacheType, list []uint64, query []T, maxResults int, filter index.SearchFilter[T]) ([]uint64, error) { + return ph.clusterMap[0].MergeResults(ctx, txn, list, query, maxResults, filter) +} diff --git a/tok/tok.go b/tok/tok.go index c74c7a9d10b..e20a647c43b 100644 --- a/tok/tok.go +++ b/tok/tok.go @@ -21,6 +21,7 @@ import ( "github.com/hypermodeinc/dgraph/v25/protos/pb" "github.com/hypermodeinc/dgraph/v25/tok/hnsw" opts "github.com/hypermodeinc/dgraph/v25/tok/options" + "github.com/hypermodeinc/dgraph/v25/tok/partitioned_hnsw" "github.com/hypermodeinc/dgraph/v25/types" "github.com/hypermodeinc/dgraph/v25/x" ) @@ -85,6 +86,7 @@ var indexFactories = make(map[string]IndexFactory) func init() { registerTokenizer(BigFloatTokenizer{}) registerIndexFactory(createIndexFactory(hnsw.CreateFactory[float32](32))) + registerIndexFactory(createIndexFactory(partitioned_hnsw.CreateFactory[float32](32))) registerTokenizer(GeoTokenizer{}) registerTokenizer(IntTokenizer{}) registerTokenizer(FloatTokenizer{}) diff --git a/worker/backup.go b/worker/backup.go index 0d803f47c06..1a58f6081e7 100644 --- a/worker/backup.go +++ b/worker/backup.go @@ -300,6 +300,11 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { if pred.Type == "float32vector" && len(pred.IndexSpecs) != 0 { vecPredMap[gid] = append(predMap[gid], pred.Predicate+hnsw.VecEntry, pred.Predicate+hnsw.VecKeyword, pred.Predicate+hnsw.VecDead) + for i := range 1000 { + vecPredMap[gid] = append(vecPredMap[gid], fmt.Sprintf("%s%s_%d", pred.Predicate, hnsw.VecEntry, i)) + vecPredMap[gid] = append(vecPredMap[gid], fmt.Sprintf("%s%s_%d", pred.Predicate, hnsw.VecKeyword, i)) + vecPredMap[gid] = append(vecPredMap[gid], fmt.Sprintf("%s%s_%d", pred.Predicate, hnsw.VecDead, i)) + } } } } diff --git a/worker/task.go b/worker/task.go index 92c1d02350f..92c399b9468 100644 --- a/worker/task.go +++ b/worker/task.go @@ -33,7 +33,6 @@ import ( "github.com/hypermodeinc/dgraph/v25/schema" ctask "github.com/hypermodeinc/dgraph/v25/task" "github.com/hypermodeinc/dgraph/v25/tok" - "github.com/hypermodeinc/dgraph/v25/tok/hnsw" "github.com/hypermodeinc/dgraph/v25/tok/index" "github.com/hypermodeinc/dgraph/v25/types" "github.com/hypermodeinc/dgraph/v25/types/facets" @@ -355,27 +354,25 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er if err != nil { return err } + //TODO: generate maxLevels from schema, filter, etc. - qc := hnsw.NewQueryCache( - posting.NewViLocalCache(qs.cache), - args.q.ReadTs, - ) indexer, err := cspec.CreateIndex(args.q.Attr) if err != nil { return err } - var nnUids []uint64 - if srcFn.vectorInfo != nil { - nnUids, err = indexer.Search(ctx, qc, srcFn.vectorInfo, - int(numNeighbors), index.AcceptAll[float32]) - } else { - nnUids, err = indexer.SearchWithUid(ctx, qc, srcFn.vectorUid, - int(numNeighbors), index.AcceptAll[float32]) + + caches := posting.GetVectorTransactions(indexer.NumThreads(), q.ReadTs, args.q.Attr) + for i := range caches { + caches[i].(*posting.VectorTransaction).SetCache(qs.cache) } + indexer.SetCaches(caches) - if err != nil && !strings.Contains(err.Error(), hnsw.EmptyHNSWTreeError+": "+badger.ErrKeyNotFound.Error()) { + nnUids, err := indexer.Search(ctx, srcFn.vectorInfo, + int(numNeighbors), index.AcceptAll[float32]) + if err != nil { return err } + sort.Slice(nnUids, func(i, j int) bool { return nnUids[i] < nnUids[j] }) args.out.UidMatrix = append(args.out.UidMatrix, &pb.List{Uids: nnUids}) return nil