Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/dynamicproperties/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,12 @@ const (
// Default value: 1s (1*time.Second)
// Allowed filters: N/A
TimerProcessorMaxTimeShift
// TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.
// KeyName: history.timerProcessorInMemoryQueueMaxTimeShift
// Value type: Duration
// Default value: 0
// Allowed filters: ShardID
TimerProcessorInMemoryQueueMaxTimeShift
// TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer
// failover queue processing. The actual jitter interval used will be a random duration between
// 0 and the max interval so that timer failover queue across different shards won't start at
Expand Down Expand Up @@ -5198,6 +5204,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{
Description: "TimerProcessorMaxTimeShift is the max shift timer processor can have",
DefaultValue: time.Second,
},
TimerProcessorInMemoryQueueMaxTimeShift: {
KeyName: "history.timerProcessorInMemoryQueueMaxTimeShift",
Filters: []Filter{ShardID},
Description: "TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.",
DefaultValue: 0,
},
TransferProcessorFailoverMaxStartJitterInterval: {
KeyName: "history.transferProcessorFailoverMaxStartJitterInterval",
Description: "TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer failover queue processing. The actual jitter interval used will be a random duration between 0 and the max interval so that timer failover queue across different shards won't start at the same time",
Expand Down
6 changes: 6 additions & 0 deletions service/history/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@ type (
Activities map[int64]*persistence.ActivityInfo
History events.PersistedBlobs
PersistenceError bool

// if true, the task will be scheduled in memory for the current execution, otherwise
// it will only be scheduled after the next DB scan. This notification is sometimes passed with a fake
// timer with the sole purpose of resetting the next scheduled DB read, that's why sometimes we want to
// avoid scheduling the task in memory.
ScheduleInMemory bool
}
)
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type Config struct {
TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn
TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn
TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn
TimerProcessorInMemoryQueueMaxTimeShift dynamicproperties.DurationPropertyFnWithShardIDFilter
TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn
TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn
DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn
Expand Down Expand Up @@ -453,6 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient),
TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize),
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift),
TimerProcessorInMemoryQueueMaxTimeShift: dc.GetDurationPropertyFilteredByShardID(dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift),
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit),
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit),
DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue),
Expand Down
1 change: 1 addition & 0 deletions service/history/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestNewConfig(t *testing.T) {
"TimerProcessorSplitQueueIntervalJitterCoefficient": {dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient, 4.0},
"TimerProcessorMaxRedispatchQueueSize": {dynamicproperties.TimerProcessorMaxRedispatchQueueSize, 45},
"TimerProcessorMaxTimeShift": {dynamicproperties.TimerProcessorMaxTimeShift, time.Second},
"TimerProcessorInMemoryQueueMaxTimeShift": {dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift, time.Duration(0)},
"TimerProcessorHistoryArchivalSizeLimit": {dynamicproperties.TimerProcessorHistoryArchivalSizeLimit, 46},
"TimerProcessorArchivalTimeLimit": {dynamicproperties.TimerProcessorArchivalTimeLimit, time.Second},
"TransferTaskBatchSize": {dynamicproperties.TransferTaskBatchSize, 47},
Expand Down
1 change: 1 addition & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ func notifyTasks(
ExecutionInfo: executionInfo,
Tasks: tasksByCategory[persistence.HistoryTaskCategoryTimer],
PersistenceError: persistenceError,
ScheduleInMemory: true,
}
replicationTaskInfo := &hcommon.NotifyTaskInfo{
ExecutionInfo: executionInfo,
Expand Down
2 changes: 2 additions & 0 deletions service/history/execution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func TestNotifyTasksFromWorkflowSnapshot(t *testing.T) {
},
},
PersistenceError: true,
ScheduleInMemory: true,
})
mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
Expand Down Expand Up @@ -419,6 +420,7 @@ func TestNotifyTasksFromWorkflowMutation(t *testing.T) {
},
},
PersistenceError: true,
ScheduleInMemory: true,
})
mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
Expand Down
8 changes: 8 additions & 0 deletions service/history/queuev2/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ func (q *queueBase) processNewTasks() bool {
return true
}

func (q *queueBase) insertSingleTask(task task.Task) bool {
return q.virtualQueueManager.InsertSingleTask(task)
}

func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) {
q.virtualQueueManager.ResetProgress(key)
}

func (q *queueBase) updateQueueState(ctx context.Context) {
q.metricsScope.IncCounter(metrics.AckLevelUpdateCounter)
queueState := &QueueState{
Expand Down
39 changes: 33 additions & 6 deletions service/history/queuev2/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,42 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT
return
}

nextTime := info.Tasks[0].GetVisibilityTimestamp()
for i := 1; i < numTasks; i++ {
ts := info.Tasks[i].GetVisibilityTimestamp()
if ts.Before(nextTime) {
nextTime = ts
q.base.logger.Debug(
"New timer task notification received",
tag.Dynamic("numTasks", numTasks),
tag.Dynamic("scheduleInMemory", info.ScheduleInMemory),
tag.Dynamic("persistenceError", info.PersistenceError),
tag.Dynamic("shardId", q.base.shard.GetShardID()),
)

tasksToBeReadFromDB := make([]persistence.Task, 0)

if info.ScheduleInMemory && !info.PersistenceError {
for _, task := range info.Tasks {
ts := task.GetVisibilityTimestamp()
q.base.logger.Debug("Submitting task to an in-memory queue", tag.Dynamic("scheduledTime", ts), tag.Dynamic("shardId", q.base.shard.GetShardID()))

if !q.base.insertSingleTask(q.base.taskInitializer(task)) {
tasksToBeReadFromDB = append(tasksToBeReadFromDB, task)
}
}
} else {
tasksToBeReadFromDB = info.Tasks
}

var nextReadTime time.Time
for _, task := range tasksToBeReadFromDB {
ts := task.GetVisibilityTimestamp()
if nextReadTime.IsZero() || ts.Before(nextReadTime) {
nextReadTime = ts
}
}

if !nextReadTime.IsZero() {
q.base.resetProgress(persistence.NewHistoryTaskKey(nextReadTime, 0))
q.notify(nextReadTime)
}

q.notify(nextTime)
q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks))
}

Expand Down
1 change: 1 addition & 0 deletions service/history/queuev2/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestScheduledQueue_LifeCycle(t *testing.T) {
mockExecutionManager := persistence.NewMockExecutionManager(ctrl)

// Setup mock expectations
mockShard.EXPECT().GetShardID().Return(1).AnyTimes()
mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes()
mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes()
mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{
Expand Down
93 changes: 71 additions & 22 deletions service/history/queuev2/virtual_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type (
SplitSlices(func(VirtualSlice) (remaining []VirtualSlice, split bool))
// Pause pauses the virtual queue for a while
Pause(time.Duration)
// InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice or the task does not satisfy the predicate
InsertSingleTask(task task.Task) bool
// ResetProgress removes the scheduled tasks after the given time
ResetProgress(persistence.HistoryTaskKey)
}

VirtualQueueOptions struct {
Expand Down Expand Up @@ -394,35 +398,14 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {

now := q.timeSource.Now()
for _, task := range tasks {
if persistence.IsTaskCorrupted(task) {
q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task))
q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)
task.Ack()
continue
}

scheduledTime := task.GetTaskKey().GetScheduledTime()
// if the scheduled time is in the future, we need to reschedule the task
if now.Before(scheduledTime) {
q.rescheduler.RescheduleTask(task, scheduledTime)
continue
}
// shard level metrics for the duration between a task being written to a queue and being fetched from it
q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp()))
task.SetInitialSubmitTime(now)
submitted, err := q.processor.TrySubmit(task)
if err != nil {
if err := q.submitOrRescheduleTask(now, task); err != nil {
select {
case <-q.ctx.Done():
return
default:
q.logger.Error("Virtual queue failed to submit task", tag.Error(err))
}
}
if !submitted {
q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval))
}
}

if sliceToRead.HasMoreTasks() {
Expand All @@ -436,6 +419,72 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
}
}

func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool {
q.Lock()
defer q.Unlock()

for e := q.virtualSlices.Front(); e != nil; e = e.Next() {
slice := e.Value.(VirtualSlice)
inserted := slice.InsertTask(task)
if inserted {
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())
now := q.timeSource.Now()
if err := q.submitOrRescheduleTask(now, task); err != nil {
q.logger.Error("Error submitting task to virtual queue", tag.Error(err))
}

return true
}
}

return false
}

func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) {
q.Lock()
defer q.Unlock()

for e := q.virtualSlices.Front(); e != nil; e = e.Next() {
slice := e.Value.(VirtualSlice)
slice.ResetProgress(key)
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())

if e == q.sliceToRead {
break
}
}

q.resetNextReadSliceLocked()
}

func (q *virtualQueueImpl) submitOrRescheduleTask(now time.Time, task task.Task) error {
if persistence.IsTaskCorrupted(task) {
q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task))
q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)

task.Ack()
return nil
}

scheduledTime := task.GetTaskKey().GetScheduledTime()
// if the scheduled time is in the future, we need to reschedule the task
if now.Before(scheduledTime) {
q.rescheduler.RescheduleTask(task, scheduledTime)
return nil
}
// shard level metrics for the duration between a task being written to a queue and being fetched from it
q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp()))
task.SetInitialSubmitTime(now)
submitted, err := q.processor.TrySubmit(task)

if !submitted {
q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval))
}

return err
}

func (q *virtualQueueImpl) resetNextReadSliceLocked() {
q.sliceToRead = nil
for element := q.virtualSlices.Front(); element != nil; element = element.Next() {
Expand Down
27 changes: 27 additions & 0 deletions service/history/queuev2/virtual_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/service/history/task"
)
Expand All @@ -56,6 +57,10 @@ type (
// Add a new virtual slice to the root queue. This is used when new tasks are generated and max read level is updated.
// By default, all new tasks belong to the root queue, so we need to add a new virtual slice to the root queue.
AddNewVirtualSliceToRootQueue(VirtualSlice)
// Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice.
InsertSingleTask(task.Task) bool
// ResetProgress resets the progress of the virtual queue to the given key. Pending tasks after the given key are cancelled.
ResetProgress(persistence.HistoryTaskKey)
}

virtualQueueManagerImpl struct {
Expand Down Expand Up @@ -218,6 +223,28 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice)
m.virtualQueues[rootQueueID].Start()
}

func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool {
m.Lock()
defer m.Unlock()

inserted := false
for _, vq := range m.virtualQueues {
if vq.InsertSingleTask(t) {
inserted = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we break here? The task only needs to be processed once, and it should only belong to one virtual queue.

}
}

return inserted
}

func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) {
m.Lock()
defer m.Unlock()
for _, vq := range m.virtualQueues {
vq.ResetProgress(key)
}
}

func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) {
now := m.timeSource.Now()
newVirtualSliceState := s.GetState()
Expand Down
29 changes: 29 additions & 0 deletions service/history/queuev2/virtual_queue_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading