Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
collectionCtx := databaseCollection.AddCollectionContext(ctx)
_, unusedSequences, err := (&DatabaseCollectionWithUser{
DatabaseCollection: databaseCollection,
}).resyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{})
}).ResyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{})

databaseCollection.releaseSequences(collectionCtx, unusedSequences)

Expand Down
7 changes: 4 additions & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,9 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
return doc, shouldUpdate, updatedExpiry, doc.Sequence, updatedUnusedSequences, nil
}

func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
// ResyncDocument will re-run the sync function on the document and write an updated version to the bucket. If
// the sync function doesn't change any channels or access grants, no write will be performed.
func (db *DatabaseCollectionWithUser) ResyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
var updatedDoc *Document
var shouldUpdate bool
var updatedExpiry *uint32
Expand Down Expand Up @@ -1855,12 +1857,11 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
_, rawSyncXattr, _, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawSyncXattr,
base.VvXattrName: rawVvXattr,
},
Expiry: updatedExpiry,
}
Expand Down
108 changes: 75 additions & 33 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3657,61 +3657,103 @@ func Test_invalidateAllPrincipalsCache(t *testing.T) {
}

func Test_resyncDocument(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("Walrus doesn't support xattr")
}
db, ctx := setupTestDB(t)
defer db.Close(ctx)

db.Options.EnableXattr = true
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

syncFn := `
testCases := []struct {
name string
useHLV bool
}{
{
name: "pre 4.0",
useHLV: false,
},
{
name: "has hlv",
useHLV: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
startingSyncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value())
syncFn := `
function sync(doc, oldDoc){
channel("channel." + "ABC");
}
`
_, err := collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)
_, err := collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

docID := uuid.NewString()
docID := uuid.NewString()

updateBody := make(map[string]any)
updateBody["val"] = "value"
_, doc, err := collection.Put(ctx, docID, updateBody)
require.NoError(t, err)
assert.NotNil(t, doc)
updateBody := make(map[string]any)
updateBody["val"] = "value"
if tc.useHLV {
_, _, err := collection.Put(ctx, docID, updateBody)
require.NoError(t, err)
} else {
collection.CreateDocNoHLV(t, ctx, docID, updateBody)
}

syncFn = `
syncFn = `
function sync(doc, oldDoc){
channel("channel." + "ABC12332423234");
}
`
_, err = collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

_, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)
_, err = collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

syncData, err := collection.GetDocSyncData(ctx, docID)
assert.NoError(t, err)
preResyncDoc, err := collection.GetDocument(ctx, docID, DocUnmarshalAll)
require.NoError(t, err)
if !tc.useHLV {
require.Nil(t, preResyncDoc.HLV)
}
_, _, err = collection.ResyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)

assert.Len(t, syncData.ChannelSet, 2)
assert.Len(t, syncData.Channels, 2)
found := false
postResyncDoc, _, err := collection.getDocWithXattrs(ctx, docID, collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), DocUnmarshalAll)
assert.NoError(t, err)

for _, chSet := range syncData.ChannelSet {
if chSet.Name == "channel.ABC12332423234" {
found = true
break
}
}
assert.Len(t, postResyncDoc.ChannelSet, 2)
assert.Len(t, postResyncDoc.Channels, 2)
found := false

assert.True(t, found)
assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value()))
for _, chSet := range postResyncDoc.ChannelSet {
if chSet.Name == "channel.ABC12332423234" {
found = true
break
}
}
assert.True(t, found)

require.NoError(t, err)
if tc.useHLV {
require.NotNil(t, postResyncDoc.HLV)
require.Equal(t, Version{
SourceID: db.EncodedSourceID,
Value: preResyncDoc.Cas,
}, Version{
SourceID: postResyncDoc.HLV.SourceID,
Value: postResyncDoc.HLV.Version,
})
} else {
require.Nil(t, postResyncDoc.HLV)
}
require.NotNil(t, postResyncDoc.MetadataOnlyUpdate)
require.Equal(t, MetadataOnlyUpdate{
HexCAS: base.CasToString(postResyncDoc.Cas),
PreviousHexCAS: base.CasToString(preResyncDoc.Cas),
PreviousRevSeqNo: preResyncDoc.RevSeqNo,
}, *postResyncDoc.MetadataOnlyUpdate)
assert.Equal(t, startingSyncFnCount+2, int(db.DbStats.Database().SyncFunctionCount.Value()))
})
}
}

func Test_getUpdatedDocument(t *testing.T) {
Expand Down
79 changes: 79 additions & 0 deletions rest/legacy_rev_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2025-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rest

import (
"fmt"
"net/http"
"testing"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/require"
)

// TestResyncLegacyRev makes sure that running resync on a legacy rev will send future blip messages as a legacy rev and not as an HLV
func TestResyncLegacyRev(t *testing.T) {
rt := NewRestTesterPersistentConfig(t)
defer rt.Close()

const (
alice = "alice"
channelName = "A"
)
collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser()
// default collection will use channel Name and named collection will use collection.Name
rt.CreateUser(alice, []string{channelName, collection.Name})

docID := db.SafeDocumentName(t, t.Name())
doc := rt.CreateDocNoHLV(docID, db.Body{"channels": channelName})

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.Run(func(t *testing.T) {
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: alice})
defer btc.Close()

btcRunner.StartOneshotPull(btc.id)

msg := btcRunner.WaitForPullRevMessage(btc.id, docID, DocVersion{RevTreeID: doc.GetRevTreeID()})
require.Equal(t, msg.Properties[db.RevMessageRev], doc.GetRevTreeID())
})
previousChannel := collection.Name
if base.IsDefaultCollection(collection.ScopeName, collection.Name) {
previousChannel = "A"
}

_, err := collection.UpdateSyncFun(ctx, fmt.Sprintf(`function() {channel("B", "%s")}`, previousChannel))
require.NoError(t, err)

// use ResyncDocument and TakeDbOffline/Online instead of /ks/_config/sync && /db/_resync to work under rosmar which
// doesn't yet support DCP resync or updating config on an existing bucket.
regenerateSequences := false
var unusedSequences []uint64
_, _, err = collection.ResyncDocument(ctx, docID, docID, regenerateSequences, unusedSequences)
require.NoError(t, err)

rt.TakeDbOffline()
rt.TakeDbOnline()

resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_raw/"+docID, "")
RequireStatus(t, resp, http.StatusOK)

btcRunner = NewBlipTesterClientRunner(t)
btcRunner.Run(func(t *testing.T) {
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: alice})
defer btc.Close()

btcRunner.StartOneshotPull(btc.id)

msg := btcRunner.WaitForPullRevMessage(btc.id, docID, DocVersion{RevTreeID: doc.GetRevTreeID()})
// make sure second rev message after resync is still legacy rev format
require.Equal(t, msg.Properties[db.RevMessageRev], doc.GetRevTreeID())
})
}
2 changes: 1 addition & 1 deletion rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@ func (btcc *BlipTesterCollectionClient) WaitForPullRevMessage(docID string, vers
return
}
var lookupVersion DocVersion
if btcc.UseHLV() {
if btcc.UseHLV() && !version.CV.IsEmpty() {
lookupVersion = DocVersion{CV: version.CV}
} else {
lookupVersion = DocVersion{RevTreeID: version.RevTreeID}
Expand Down
Loading