Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 55 additions & 40 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type changeCache struct {
initialSequence uint64 // DB's current sequence at startup time.
receivedSeqs map[uint64]struct{} // Set of all sequences received
pendingLogs LogPriorityQueue // Out-of-sequence entries waiting to be cached
notifyChange func(context.Context, channels.Set) // Client callback that notifies of channel changes
notifyChangeFunc func(context.Context, channels.Set) // Client callback that notifies of channel changes
started base.AtomicBool // Set by the Start method
stopped base.AtomicBool // Set by the Stop method
skippedSeqs *SkippedSequenceSkiplist // Skipped sequences still pending on the DCP caching feed
Expand Down Expand Up @@ -145,15 +145,15 @@ func DefaultCacheOptions() CacheOptions {

// Initializes a new changeCache.
// lastSequence is the last known database sequence assigned.
// notifyChange is an optional function that will be called to notify of channel changes.
// notifyChangeFunc is an optional function that will be called to notify of channel changes.
// After calling Init(), you must call .Start() to start using the cache, otherwise it will be in a locked state
// and callers will block on trying to obtain the lock.

func (c *changeCache) Init(ctx context.Context, dbContext *DatabaseContext, channelCache ChannelCache, notifyChange func(context.Context, channels.Set), options *CacheOptions, metaKeys *base.MetadataKeys) error {
c.db = dbContext
c.logCtx = ctx

c.notifyChange = notifyChange
c.notifyChangeFunc = notifyChange
c.receivedSeqs = make(map[uint64]struct{})
c.terminator = make(chan bool)
c.initTime = time.Now()
Expand Down Expand Up @@ -277,11 +277,10 @@ func (c *changeCache) InsertPendingEntries(ctx context.Context) error {
// Trigger _addPendingLogs to process any entries that have been pending too long:
c.lock.Lock()
changedChannels := c._addPendingLogs(ctx)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
}
c.lock.Unlock()

c.notifyChange(ctx, changedChannels)

return nil
}

Expand Down Expand Up @@ -450,7 +449,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
UnusedSequence: true,
}
changedChannels := c.processEntry(ctx, change)
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
channelSet := channels.SetFromArrayNoValidate(changedChannels)
changedChannelsCombined = changedChannelsCombined.Update(channelSet)
}
base.DebugfCtx(ctx, base.KeyCache, "Received unused sequences in unused_sequences property for (%q / %q): %v", base.UD(docID), syncData.GetRevTreeID(), syncData.UnusedSequences)
}
Expand Down Expand Up @@ -498,7 +498,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
}

changedChannels := c.processEntry(ctx, change)
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
channelSet := channels.SetFromArrayNoValidate(changedChannels)
changedChannelsCombined = changedChannelsCombined.Update(channelSet)
}
}
if len(seqsCached) > 0 {
Expand Down Expand Up @@ -544,11 +545,12 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
}

changedChannels := c.processEntry(ctx, change)
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
channelSet := channels.SetFromArrayNoValidate(changedChannels)
changedChannelsCombined = changedChannelsCombined.Update(channelSet)

// Notify change listeners for all of the changed channels
if c.notifyChange != nil && len(changedChannelsCombined) > 0 {
c.notifyChange(ctx, changedChannelsCombined)
if c.notifyChangeFunc != nil && len(changedChannelsCombined) > 0 {
c.notifyChangeFunc(ctx, changedChannelsCombined)
}

}
Expand All @@ -559,6 +561,13 @@ type cachePrincipal struct {
Sequence uint64 `json:"sequence"`
}

func (c *changeCache) notifyChange(ctx context.Context, chs []channels.ID) {
if c.notifyChangeFunc == nil || len(chs) == 0 {
return
}
c.notifyChangeFunc(ctx, channels.SetFromArrayNoValidate(chs))
}

func (c *changeCache) Remove(ctx context.Context, collectionID uint32, docIDs []string, startTime time.Time) (count int) {
return c.channelCache.Remove(ctx, collectionID, docIDs, startTime)
}
Expand Down Expand Up @@ -592,14 +601,16 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64

// Since processEntry may unblock pending sequences, if there were any changed channels we need
// to notify any change listeners that are working changes feeds for these channels
var channelSet channels.Set
changedChannels := c.processEntry(ctx, change)
if changedChannels == nil {
changedChannels = channels.SetOfNoValidate(unusedSeqChannelID)
channelSet = channels.SetOfNoValidate(unusedSeqChannelID)
} else {
changedChannels.Add(unusedSeqChannelID)
channelSet = channels.SetFromArrayNoValidate(changedChannels)
channelSet.Add(unusedSeqChannelID)
}
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
if c.notifyChangeFunc != nil && len(channelSet) > 0 {
c.notifyChangeFunc(ctx, channelSet)
}
}

Expand All @@ -619,36 +630,38 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
UnusedSequence: true,
}
changedChannels := c.processEntry(ctx, change)
allChangedChannels = allChangedChannels.Update(changedChannels)
if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
channelSet := channels.SetFromArrayNoValidate(changedChannels)
allChangedChannels = allChangedChannels.Update(channelSet)
if c.notifyChangeFunc != nil {
c.notifyChangeFunc(ctx, allChangedChannels)
}
return
}

// push unused range to either pending or skipped lists based on current state of the change cache
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)
changedChannels := c.processUnusedRange(ctx, fromSequence, toSequence, timeReceived)
allChangedChannels.Update(channels.SetFromArrayNoValidate(changedChannels))

if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
if c.notifyChangeFunc != nil {
c.notifyChangeFunc(ctx, allChangedChannels)
}
}

// processUnusedRange handles pushing unused range to pending or skipped lists
func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSequence uint64, allChangedChannels channels.Set, timeReceived channels.FeedTimestamp) channels.Set {
func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSequence uint64, timeReceived channels.FeedTimestamp) []channels.ID {
c.lock.Lock()
defer c.lock.Unlock()

var numSkipped int64
var changedChannels []channels.ID
if toSequence < c.nextSequence {
// batch remove from skipped
numSkipped = c.skippedSeqs.processUnusedSequenceRangeAtSkipped(ctx, fromSequence, toSequence)
} else if fromSequence >= c.nextSequence {
// whole range to pending
c._pushRangeToPending(fromSequence, toSequence, timeReceived)
// unblock any pending sequences we can after new range(s) have been pushed to pending
changedChannels := c._addPendingLogs(ctx)
allChangedChannels = allChangedChannels.Update(changedChannels)
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
c.internalStats.pendingSeqLen = len(c.pendingLogs)
} else {
// An unused sequence range than includes c.nextSequence in the middle of the range
Expand All @@ -662,7 +675,7 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
if numSkipped == 0 {
c.db.BroadcastSlowMode.CompareAndSwap(true, false)
}
return allChangedChannels
return changedChannels
}

// _pushRangeToPending will push an unused sequence range to pendingLogs
Expand Down Expand Up @@ -732,13 +745,14 @@ func (c *changeCache) processPrincipalDoc(ctx context.Context, docID string, doc
base.InfofCtx(ctx, base.KeyChanges, "Received #%d (%q)", change.Sequence, base.UD(change.DocID))

changedChannels := c.processEntry(ctx, change)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
}

c.notifyChange(ctx, changedChannels)
}

// Handles a newly-arrived LogEntry.
func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channels.Set {
// processEntry handles a newly-arrived LogEntry and returns the changes channels from this revision.
// This can be any existing, removed or newly added channels. Its possible for channels slice returned to have duplicates
// in it. It is the callers responsibility to de-duplicate before notifying any changes.
func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) []channels.ID {
c.lock.Lock()
defer c.lock.Unlock()
if c.logsDisabled {
Expand Down Expand Up @@ -773,12 +787,12 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
}
c.receivedSeqs[sequence] = struct{}{}

var changedChannels channels.Set
var changedChannels []channels.ID
if sequence == c.nextSequence || c.nextSequence == 0 {
// This is the expected next sequence so we can add it now:
changedChannels = c._addToCache(ctx, change)
// Also add any pending sequences that are now contiguous:
changedChannels = changedChannels.Update(c._addPendingLogs(ctx))
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
} else if sequence > c.nextSequence {
// There's a missing sequence (or several), so put this one on ice until it arrives:
heap.Push(&c.pendingLogs, change)
Expand All @@ -795,7 +809,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe

if numPending > c.options.CachePendingSeqMaxNum {
// Too many pending; add the oldest one:
changedChannels = c._addPendingLogs(ctx)
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
}
} else if sequence > c.initialSequence {
// Out-of-order sequence received!
Expand All @@ -807,7 +821,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
base.DebugfCtx(ctx, base.KeyCache, " Received previously skipped out-of-order change (seq %d, expecting %d) doc %q / %q ", sequence, c.nextSequence, base.UD(change.DocID), change.RevID)
}

changedChannels = changedChannels.Update(c._addToCache(ctx, change))
changedChannels = append(changedChannels, c._addToCache(ctx, change)...)
// Add to cache before removing from skipped, to ensure lowSequence doesn't get incremented until results are available
// in cache
err := c.RemoveSkipped(sequence)
Expand All @@ -820,7 +834,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe

// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
// flag indicates whether it was a change arriving out of sequence
func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channels.Set {
func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []channels.ID {

if change.Sequence >= c.nextSequence {
c.nextSequence = change.Sequence + 1
Expand Down Expand Up @@ -857,11 +871,12 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channel
return updatedChannels
}

// Add the first change(s) from pendingLogs if they're the next sequence. If not, and we've been
// _addPendingLogs Add the first change(s) from pendingLogs if they're the next sequence. If not, and we've been
// waiting too long for nextSequence, move nextSequence to skipped queue.
// Returns the channels that changed.
func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
var changedChannels channels.Set
// Returns the channels that changed. This may return the same channel more than once, channels should be deduplicated
// before notifying the changes.
func (c *changeCache) _addPendingLogs(ctx context.Context) []channels.ID {
var changedChannels []channels.ID
var isNext bool

for len(c.pendingLogs) > 0 {
Expand All @@ -870,7 +885,7 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {

if isNext {
oldestPending = c._popPendingLog(ctx)
changedChannels = changedChannels.Update(c._addToCache(ctx, oldestPending))
changedChannels = append(changedChannels, c._addToCache(ctx, oldestPending)...)
} else if oldestPending.Sequence < c.nextSequence {
// oldest pending is lower than next sequence, should be ignored
base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence)
Expand Down
24 changes: 12 additions & 12 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1338,10 +1338,10 @@ func readNextFromFeed(feed <-chan (*ChangeEntry), timeout time.Duration) (*Chang
//
// Create doc1 w/ unused sequences 1, actual sequence 3.
// Create doc2 w/ sequence 2, channel ABC
// Send feed event for doc2. This won't trigger notifyChange, as buffering is waiting for seq 1
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChange for channel ABC.
// Send feed event for doc2. This won't trigger notifyChangeFunc, as buffering is waiting for seq 1
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChangeFunc for channel ABC.
//
// Verify that notifyChange for channel ABC was sent.
// Verify that notifyChangeFunc for channel ABC was sent.
func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
base.LongRunningTest(t)

Expand All @@ -1359,12 +1359,12 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
collection := GetSingleDatabaseCollection(t, db.DatabaseContext)
collectionID := collection.GetCollectionID()

// -------- Setup notifyChange callback ----------------
// -------- Setup notifyChangeFunc callback ----------------

// Detect whether the 2nd was ignored using an notifyChange listener callback and make sure it was not added to the ABC channel
// Detect whether the 2nd was ignored using an notifyChangeFunc listener callback and make sure it was not added to the ABC channel
waitForOnChangeCallback := sync.WaitGroup{}
waitForOnChangeCallback.Add(1)
db.changeCache.notifyChange = func(_ context.Context, chans channels.Set) {
db.changeCache.notifyChangeFunc = func(_ context.Context, chans channels.Set) {
expectedChan := channels.NewID("ABC", collectionID)
for ch := range chans {
if ch == expectedChan {
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
require.NoError(t, err)
}

// Send feed event for doc2. This won't trigger notifyChange, as buffering is waiting for seq 1
// Send feed event for doc2. This won't trigger notifyChangeFunc, as buffering is waiting for seq 1
feedEventDoc2 := sgbucket.FeedEvent{
Synchronous: true,
Key: []byte(doc2Id),
Expand All @@ -1455,7 +1455,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
}
db.changeCache.DocChanged(feedEventDoc2, DocTypeDocument)

// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChange for channel ABC.
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChangeFunc for channel ABC.
feedEventDoc1 := sgbucket.FeedEvent{
Synchronous: true,
Key: []byte(doc1Id),
Expand All @@ -1466,7 +1466,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {

// -------- Wait for waitgroup ----------------

// Block until the notifyChange callback was invoked with the expected channels.
// Block until the notifyChangeFunc callback was invoked with the expected channels.
// If the callback is never called back with expected, will block forever.
waitForOnChangeCallback.Wait()

Expand Down Expand Up @@ -1620,7 +1620,7 @@ func TestInitializeCacheUnderLoad(t *testing.T) {

}

// Verify that notifyChange for channel zero is sent even when the channel isn't active in the cache.
// Verify that notifyChangeFunc for channel zero is sent even when the channel isn't active in the cache.
func TestNotifyForInactiveChannel(t *testing.T) {

// Enable relevant logging
Expand All @@ -1633,10 +1633,10 @@ func TestNotifyForInactiveChannel(t *testing.T) {
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
collectionID := collection.GetCollectionID()

// -------- Setup notifyChange callback ----------------
// -------- Setup notifyChangeFunc callback ----------------

notifyChannel := make(chan struct{})
db.changeCache.notifyChange = func(_ context.Context, chans channels.Set) {
db.changeCache.notifyChangeFunc = func(_ context.Context, chans channels.Set) {
expectedChan := channels.NewID("zero", collectionID)
if chans.Contains(expectedChan) {
notifyChannel <- struct{}{}
Expand Down
10 changes: 5 additions & 5 deletions db/channel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ChannelCache interface {
Init(initialSequence uint64)

// Adds an entry to the cache, returns set of channels it was added to
AddToCache(ctx context.Context, change *LogEntry) channels.Set
AddToCache(ctx context.Context, change *LogEntry) []channels.ID

// Notifies the cache of a principal update. Updates the cache's high sequence
AddPrincipal(change *LogEntry)
Expand Down Expand Up @@ -197,14 +197,14 @@ func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {

// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
// flag indicates whether it was a change arriving out of sequence
func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) channels.Set {
func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) []channels.ID {

ch := change.Channels
change.Channels = nil // not needed anymore, so free some memory

// updatedChannels tracks the set of channels that should be notified of the change. This includes
// the change's active channels, as well as any channel removals for the active revision.
updatedChannels := make(channels.Set, len(ch)+1) // +1 for the star channel
updatedChannels := make([]channels.ID, 0, len(ch)+1) // +1 for the star channel

// If it's a late sequence, we want to add to all channel late queues within a single write lock,
// to avoid a changes feed seeing the same late sequence in different iteration loops (and sending
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) cha
}
}
// Need to notify even if channel isn't active, for case where number of connected changes channels exceeds cache capacity
updatedChannels.Add(channelID)
updatedChannels = append(updatedChannels, channelID)
}
}

Expand All @@ -247,7 +247,7 @@ func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) cha
channelCache.AddLateSequence(change)
}
}
updatedChannels.Add(starChannelID)
updatedChannels = append(updatedChannels, starChannelID)
}

c.updateHighCacheSequence(change.Sequence)
Expand Down
2 changes: 1 addition & 1 deletion tools/cache_perf_tool/dcpDataGeneration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (dcp *dcpDataGen) vBucketCreation(ctx context.Context) {
delayIndex := 0
// vBucket creation logic
for i := range 1024 {
time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times
time.Sleep(100 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times
if i == 520 {
go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second) // sync seq hot vBucket so high delay
} else {
Expand Down
Loading