diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index 02e7a35add6..8f1a6c42214 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -2688,6 +2688,12 @@ const ( // Default value: 1s (1*time.Second) // Allowed filters: N/A TimerProcessorMaxTimeShift + // TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled. + // KeyName: history.timerProcessorInMemoryQueueMaxTimeShift + // Value type: Duration + // Default value: 0 + // Allowed filters: ShardID + TimerProcessorInMemoryQueueMaxTimeShift // TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer // failover queue processing. The actual jitter interval used will be a random duration between // 0 and the max interval so that timer failover queue across different shards won't start at @@ -5226,6 +5232,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{ Description: "TimerProcessorMaxTimeShift is the max shift timer processor can have", DefaultValue: time.Second, }, + TimerProcessorInMemoryQueueMaxTimeShift: { + KeyName: "history.timerProcessorInMemoryQueueMaxTimeShift", + Filters: []Filter{ShardID}, + Description: "TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.", + DefaultValue: 0, + }, TransferProcessorFailoverMaxStartJitterInterval: { KeyName: "history.transferProcessorFailoverMaxStartJitterInterval", Description: "TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer failover queue processing. The actual jitter interval used will be a random duration between 0 and the max interval so that timer failover queue across different shards won't start at the same time", diff --git a/service/history/common/type.go b/service/history/common/type.go index def10db08d0..59ce485e49b 100644 --- a/service/history/common/type.go +++ b/service/history/common/type.go @@ -36,5 +36,11 @@ type ( Activities map[int64]*persistence.ActivityInfo History events.PersistedBlobs PersistenceError bool + + // if true, the task will be scheduled in memory for the current execution, otherwise + // it will only be scheduled after the next DB scan. This notification is sometimes passed with a fake + // timer with the sole purpose of resetting the next scheduled DB read, that's why sometimes we want to + // avoid scheduling the task in memory. + ScheduleInMemory bool } ) diff --git a/service/history/config/config.go b/service/history/config/config.go index d39933d0d01..0d77af885fe 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -154,6 +154,7 @@ type Config struct { TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn + TimerProcessorInMemoryQueueMaxTimeShift dynamicproperties.DurationPropertyFnWithShardIDFilter TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn @@ -453,6 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient), TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize), TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift), + TimerProcessorInMemoryQueueMaxTimeShift: dc.GetDurationPropertyFilteredByShardID(dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift), TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit), TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit), DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue), diff --git a/service/history/config/config_test.go b/service/history/config/config_test.go index 3c498f5c72c..3e7baf57962 100644 --- a/service/history/config/config_test.go +++ b/service/history/config/config_test.go @@ -139,6 +139,7 @@ func TestNewConfig(t *testing.T) { "TimerProcessorSplitQueueIntervalJitterCoefficient": {dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient, 4.0}, "TimerProcessorMaxRedispatchQueueSize": {dynamicproperties.TimerProcessorMaxRedispatchQueueSize, 45}, "TimerProcessorMaxTimeShift": {dynamicproperties.TimerProcessorMaxTimeShift, time.Second}, + "TimerProcessorInMemoryQueueMaxTimeShift": {dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift, time.Duration(0)}, "TimerProcessorHistoryArchivalSizeLimit": {dynamicproperties.TimerProcessorHistoryArchivalSizeLimit, 46}, "TimerProcessorArchivalTimeLimit": {dynamicproperties.TimerProcessorArchivalTimeLimit, time.Second}, "TransferTaskBatchSize": {dynamicproperties.TransferTaskBatchSize, 47}, diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 413855b3b9d..a7c1f34516d 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -992,6 +992,7 @@ func notifyTasks( ExecutionInfo: executionInfo, Tasks: tasksByCategory[persistence.HistoryTaskCategoryTimer], PersistenceError: persistenceError, + ScheduleInMemory: true, } replicationTaskInfo := &hcommon.NotifyTaskInfo{ ExecutionInfo: executionInfo, diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index ca20f0fce58..4428c62e745 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -289,6 +289,7 @@ func TestNotifyTasksFromWorkflowSnapshot(t *testing.T) { }, }, PersistenceError: true, + ScheduleInMemory: true, }) mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ @@ -419,6 +420,7 @@ func TestNotifyTasksFromWorkflowMutation(t *testing.T) { }, }, PersistenceError: true, + ScheduleInMemory: true, }) mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ diff --git a/service/history/queuev2/queue_base.go b/service/history/queuev2/queue_base.go index c45e00bcc2f..8db0ef8d1a5 100644 --- a/service/history/queuev2/queue_base.go +++ b/service/history/queuev2/queue_base.go @@ -274,6 +274,14 @@ func (q *queueBase) processNewTasks() bool { return true } +func (q *queueBase) insertSingleTask(task task.Task) bool { + return q.virtualQueueManager.InsertSingleTask(task) +} + +func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) { + q.virtualQueueManager.ResetProgress(key) +} + func (q *queueBase) updateQueueState(ctx context.Context) { q.metricsScope.IncCounter(metrics.AckLevelUpdateCounter) queueState := &QueueState{ diff --git a/service/history/queuev2/queue_scheduled.go b/service/history/queuev2/queue_scheduled.go index fff1d0bef7d..1b439c488cf 100644 --- a/service/history/queuev2/queue_scheduled.go +++ b/service/history/queuev2/queue_scheduled.go @@ -143,15 +143,42 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT return } - nextTime := info.Tasks[0].GetVisibilityTimestamp() - for i := 1; i < numTasks; i++ { - ts := info.Tasks[i].GetVisibilityTimestamp() - if ts.Before(nextTime) { - nextTime = ts + q.base.logger.Debug( + "New timer task notification received", + tag.Dynamic("numTasks", numTasks), + tag.Dynamic("scheduleInMemory", info.ScheduleInMemory), + tag.Dynamic("persistenceError", info.PersistenceError), + tag.Dynamic("shardId", q.base.shard.GetShardID()), + ) + + tasksToBeReadFromDB := make([]persistence.Task, 0) + + if info.ScheduleInMemory && !info.PersistenceError { + for _, task := range info.Tasks { + ts := task.GetVisibilityTimestamp() + q.base.logger.Debug("Submitting task to an in-memory queue", tag.Dynamic("scheduledTime", ts), tag.Dynamic("shardId", q.base.shard.GetShardID())) + + if !q.base.insertSingleTask(q.base.taskInitializer(task)) { + tasksToBeReadFromDB = append(tasksToBeReadFromDB, task) + } } + } else { + tasksToBeReadFromDB = info.Tasks + } + + var nextReadTime time.Time + for _, task := range tasksToBeReadFromDB { + ts := task.GetVisibilityTimestamp() + if nextReadTime.IsZero() || ts.Before(nextReadTime) { + nextReadTime = ts + } + } + + if !nextReadTime.IsZero() { + q.base.resetProgress(persistence.NewHistoryTaskKey(nextReadTime, 0)) + q.notify(nextReadTime) } - q.notify(nextTime) q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks)) } diff --git a/service/history/queuev2/queue_scheduled_test.go b/service/history/queuev2/queue_scheduled_test.go index cc96ac7588a..2cdb9b5c79f 100644 --- a/service/history/queuev2/queue_scheduled_test.go +++ b/service/history/queuev2/queue_scheduled_test.go @@ -16,13 +16,50 @@ import ( "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" hcommon "github.com/uber/cadence/service/history/common" + historyconfig "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" ) +func setupBasicShardMocks(mockShard *shard.MockContext, mockTimeSource clock.TimeSource, mockExecutionManager *persistence.MockExecutionManager) { + mockShard.EXPECT().GetShardID().Return(1).AnyTimes() + mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes() + mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes() + mockShard.EXPECT().GetConfig().Return(&historyconfig.Config{ + TaskCriticalRetryCount: dynamicproperties.GetIntPropertyFn(3), + }).AnyTimes() + mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{ + VirtualQueueStates: map[int64]*types.VirtualQueueState{ + 0: { + VirtualSliceStates: []*types.VirtualSliceState{ + { + TaskRange: &types.TaskRange{ + InclusiveMin: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().Add(-1 * time.Hour).UnixNano(), + }, + ExclusiveMax: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().UnixNano(), + }, + }, + }, + }, + }, + }, + ExclusiveMaxReadLevel: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().UnixNano(), + }, + }, nil).AnyTimes() + mockShard.EXPECT().GetExecutionManager().Return(mockExecutionManager).AnyTimes() + mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{}, nil).AnyTimes() + mockExecutionManager.EXPECT().RangeCompleteHistoryTask(gomock.Any(), gomock.Any()).Return(&persistence.RangeCompleteHistoryTaskResponse{}, nil).AnyTimes() + mockShard.EXPECT().UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryTimer, cluster.TestCurrentClusterName).Return(persistence.NewHistoryTaskKey(time.Now(), 10)).AnyTimes() + mockShard.EXPECT().UpdateQueueState(persistence.HistoryTaskCategoryTimer, gomock.Any()).Return(nil).AnyTimes() +} + func TestScheduledQueue_LifeCycle(t *testing.T) { defer goleak.VerifyNone(t) ctrl := gomock.NewController(t) @@ -37,6 +74,7 @@ func TestScheduledQueue_LifeCycle(t *testing.T) { mockExecutionManager := persistence.NewMockExecutionManager(ctrl) // Setup mock expectations + mockShard.EXPECT().GetShardID().Return(1).AnyTimes() mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes() mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes() mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{ @@ -120,6 +158,482 @@ func TestScheduledQueue_LifeCycle(t *testing.T) { assert.Equal(t, common.DaemonStatusStopped, atomic.LoadInt32(&queue.status)) } +func TestScheduledQueue_NotifyNewTask_EmptyTasks(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + assert.NotPanics(t, func() { + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: []persistence.Task{}, + }) + }) +} + +func TestScheduledQueue_NotifyNewTask_FieldCombinations(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + scheduleInMemory bool + persistenceError bool + expectInMemory bool + expectDBRead bool + }{ + { + name: "ScheduleInMemory=true, PersistenceError=false - should try in-memory", + scheduleInMemory: true, + persistenceError: false, + expectInMemory: true, + expectDBRead: false, + }, + { + name: "ScheduleInMemory=true, PersistenceError=true - should skip in-memory", + scheduleInMemory: true, + persistenceError: true, + expectInMemory: false, + expectDBRead: true, + }, + { + name: "ScheduleInMemory=false, PersistenceError=false - should skip in-memory", + scheduleInMemory: false, + persistenceError: false, + expectInMemory: false, + expectDBRead: true, + }, + { + name: "ScheduleInMemory=false, PersistenceError=true - should skip in-memory", + scheduleInMemory: false, + persistenceError: true, + expectInMemory: false, + expectDBRead: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(1)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + testTask := &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: mockTimeSource.Now().Add(time.Hour), + }, + } + + if tt.expectInMemory { + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(true) + } + if tt.expectDBRead { + mockVirtualQueueManager.EXPECT().ResetProgress(gomock.Any()) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: []persistence.Task{testTask}, + ScheduleInMemory: tt.scheduleInMemory, + PersistenceError: tt.persistenceError, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_InsertionScenarios(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + numTasks int + insertionResults []bool + expectedTasksToReadFromDB int + }{ + { + name: "All tasks inserted successfully", + numTasks: 3, + insertionResults: []bool{true, true, true}, + expectedTasksToReadFromDB: 0, + }, + { + name: "Some tasks fail insertion", + numTasks: 3, + insertionResults: []bool{true, false, true}, + expectedTasksToReadFromDB: 1, + }, + { + name: "All tasks fail insertion", + numTasks: 3, + insertionResults: []bool{false, false, false}, + expectedTasksToReadFromDB: 3, + }, + { + name: "Single task success", + numTasks: 1, + insertionResults: []bool{true}, + expectedTasksToReadFromDB: 0, + }, + { + name: "Single task failure", + numTasks: 1, + insertionResults: []bool{false}, + expectedTasksToReadFromDB: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(tt.numTasks)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + tasks := make([]persistence.Task, tt.numTasks) + for i := 0; i < tt.numTasks; i++ { + tasks[i] = &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: mockTimeSource.Now().Add(time.Duration(i+1) * time.Hour), + }, + } + } + + for _, result := range tt.insertionResults { + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(result).Times(1) + } + + if tt.expectedTasksToReadFromDB > 0 { + mockVirtualQueueManager.EXPECT().ResetProgress(gomock.Any()).Times(1) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_TimestampCalculation(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + taskTimestamps []time.Time + expectedEarliest time.Time + allInsertionsFail bool + }{ + { + name: "Multiple tasks with different timestamps", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 10, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 14, 0, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 10, 0, 0, 0, time.UTC), + allInsertionsFail: true, + }, + { + name: "Tasks with same timestamps", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + allInsertionsFail: true, + }, + { + name: "Single task", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 15, 30, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 15, 30, 0, 0, time.UTC), + allInsertionsFail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(len(tt.taskTimestamps))) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + tasks := make([]persistence.Task, len(tt.taskTimestamps)) + for i, ts := range tt.taskTimestamps { + tasks[i] = &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: ts, + }, + } + } + + if tt.allInsertionsFail { + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(false).Times(len(tasks)) + expectedKey := persistence.NewHistoryTaskKey(tt.expectedEarliest, 0) + mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_MultipleTaskTypes(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(4)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + baseTime := mockTimeSource.Now() + + tasks := []persistence.Task{ + &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(1 * time.Hour), + }, + }, + &persistence.ActivityTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(30 * time.Minute), // Earliest + }, + }, + &persistence.WorkflowTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(2 * time.Hour), + }, + }, + &persistence.UserTimerTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(45 * time.Minute), + }, + }, + } + + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(false).Times(4) + + expectedKey := persistence.NewHistoryTaskKey(baseTime.Add(30*time.Minute), 0) + mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) +} + func TestScheduledQueue_LookAheadTask(t *testing.T) { defer goleak.VerifyNone(t) diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index 06ce7863138..88d33a71d96 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -67,6 +67,10 @@ type ( SplitSlices(func(VirtualSlice) (remaining []VirtualSlice, split bool)) // Pause pauses the virtual queue for a while Pause(time.Duration) + // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice or the task does not satisfy the predicate + InsertSingleTask(task task.Task) bool + // ResetProgress removes the scheduled tasks after the given time + ResetProgress(persistence.HistoryTaskKey) } VirtualQueueOptions struct { @@ -394,24 +398,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { now := q.timeSource.Now() for _, task := range tasks { - if persistence.IsTaskCorrupted(task) { - q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) - q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) - task.Ack() - continue - } - - scheduledTime := task.GetTaskKey().GetScheduledTime() - // if the scheduled time is in the future, we need to reschedule the task - if now.Before(scheduledTime) { - q.rescheduler.RescheduleTask(task, scheduledTime) - continue - } - // shard level metrics for the duration between a task being written to a queue and being fetched from it - q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp())) - task.SetInitialSubmitTime(now) - submitted, err := q.processor.TrySubmit(task) - if err != nil { + if err := q.submitOrRescheduleTask(now, task); err != nil { select { case <-q.ctx.Done(): return @@ -419,10 +406,6 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { q.logger.Error("Virtual queue failed to submit task", tag.Error(err)) } } - if !submitted { - q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter) - q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval)) - } } if sliceToRead.HasMoreTasks() { @@ -436,6 +419,72 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { } } +func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool { + q.Lock() + defer q.Unlock() + + for e := q.virtualSlices.Front(); e != nil; e = e.Next() { + slice := e.Value.(VirtualSlice) + inserted := slice.InsertTask(task) + if inserted { + q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + now := q.timeSource.Now() + if err := q.submitOrRescheduleTask(now, task); err != nil { + q.logger.Error("Error submitting task to virtual queue", tag.Error(err)) + } + + return true + } + } + + return false +} + +func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) { + q.Lock() + defer q.Unlock() + + for e := q.virtualSlices.Front(); e != nil; e = e.Next() { + slice := e.Value.(VirtualSlice) + slice.ResetProgress(key) + q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + + if e == q.sliceToRead { + break + } + } + + q.resetNextReadSliceLocked() +} + +func (q *virtualQueueImpl) submitOrRescheduleTask(now time.Time, task task.Task) error { + if persistence.IsTaskCorrupted(task) { + q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) + q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) + + task.Ack() + return nil + } + + scheduledTime := task.GetTaskKey().GetScheduledTime() + // if the scheduled time is in the future, we need to reschedule the task + if now.Before(scheduledTime) { + q.rescheduler.RescheduleTask(task, scheduledTime) + return nil + } + // shard level metrics for the duration between a task being written to a queue and being fetched from it + q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp())) + task.SetInitialSubmitTime(now) + submitted, err := q.processor.TrySubmit(task) + + if !submitted { + q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter) + q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval)) + } + + return err +} + func (q *virtualQueueImpl) resetNextReadSliceLocked() { q.sliceToRead = nil for element := q.virtualSlices.Front(); element != nil; element = element.Next() { diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index 5629c12d488..68afba4867f 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/service/history/task" ) @@ -56,6 +57,10 @@ type ( // Add a new virtual slice to the root queue. This is used when new tasks are generated and max read level is updated. // By default, all new tasks belong to the root queue, so we need to add a new virtual slice to the root queue. AddNewVirtualSliceToRootQueue(VirtualSlice) + // Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice. + InsertSingleTask(task.Task) bool + // ResetProgress resets the progress of the virtual queue to the given key. Pending tasks after the given key are cancelled. + ResetProgress(persistence.HistoryTaskKey) } virtualQueueManagerImpl struct { @@ -218,6 +223,27 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice) m.virtualQueues[rootQueueID].Start() } +func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool { + m.Lock() + defer m.Unlock() + + for _, vq := range m.virtualQueues { + if vq.InsertSingleTask(t) { + return true + } + } + + return false +} + +func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) { + m.Lock() + defer m.Unlock() + for _, vq := range m.virtualQueues { + vq.ResetProgress(key) + } +} + func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) { now := m.timeSource.Now() newVirtualSliceState := s.GetState() diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index 62771415933..29136036ccd 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -13,6 +13,9 @@ import ( reflect "reflect" gomock "go.uber.org/mock/gomock" + + persistence "github.com/uber/cadence/common/persistence" + task "github.com/uber/cadence/service/history/task" ) // MockVirtualQueueManager is a mock of VirtualQueueManager interface. @@ -65,6 +68,32 @@ func (mr *MockVirtualQueueManagerMockRecorder) GetOrCreateVirtualQueue(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateVirtualQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).GetOrCreateVirtualQueue), arg0) } +// InsertSingleTask mocks base method. +func (m *MockVirtualQueueManager) InsertSingleTask(arg0 task.Task) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertSingleTask", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// InsertSingleTask indicates an expected call of InsertSingleTask. +func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTask(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTask", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTask), arg0) +} + +// ResetProgress mocks base method. +func (m *MockVirtualQueueManager) ResetProgress(arg0 persistence.HistoryTaskKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ResetProgress", arg0) +} + +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualQueueManagerMockRecorder) ResetProgress(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualQueueManager)(nil).ResetProgress), arg0) +} + // Start mocks base method. func (m *MockVirtualQueueManager) Start() { m.ctrl.T.Helper() diff --git a/service/history/queuev2/virtual_queue_mock.go b/service/history/queuev2/virtual_queue_mock.go index 517a8411ce8..70b45c83429 100644 --- a/service/history/queuev2/virtual_queue_mock.go +++ b/service/history/queuev2/virtual_queue_mock.go @@ -14,6 +14,9 @@ import ( time "time" gomock "go.uber.org/mock/gomock" + + persistence "github.com/uber/cadence/common/persistence" + task "github.com/uber/cadence/service/history/task" ) // MockVirtualQueue is a mock of VirtualQueue interface. @@ -82,6 +85,20 @@ func (mr *MockVirtualQueueMockRecorder) GetState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetState", reflect.TypeOf((*MockVirtualQueue)(nil).GetState)) } +// InsertSingleTask mocks base method. +func (m *MockVirtualQueue) InsertSingleTask(task task.Task) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertSingleTask", task) + ret0, _ := ret[0].(bool) + return ret0 +} + +// InsertSingleTask indicates an expected call of InsertSingleTask. +func (mr *MockVirtualQueueMockRecorder) InsertSingleTask(task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTask", reflect.TypeOf((*MockVirtualQueue)(nil).InsertSingleTask), task) +} + // IterateSlices mocks base method. func (m *MockVirtualQueue) IterateSlices(arg0 func(VirtualSlice)) { m.ctrl.T.Helper() @@ -134,6 +151,18 @@ func (mr *MockVirtualQueueMockRecorder) Pause(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pause", reflect.TypeOf((*MockVirtualQueue)(nil).Pause), arg0) } +// ResetProgress mocks base method. +func (m *MockVirtualQueue) ResetProgress(arg0 persistence.HistoryTaskKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ResetProgress", arg0) +} + +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualQueueMockRecorder) ResetProgress(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualQueue)(nil).ResetProgress), arg0) +} + // SplitSlices mocks base method. func (m *MockVirtualQueue) SplitSlices(arg0 func(VirtualSlice) ([]VirtualSlice, bool)) { m.ctrl.T.Helper() diff --git a/service/history/queuev2/virtual_queue_test.go b/service/history/queuev2/virtual_queue_test.go index ccb5ccb2e7e..5aa46a4a13e 100644 --- a/service/history/queuev2/virtual_queue_test.go +++ b/service/history/queuev2/virtual_queue_test.go @@ -1791,3 +1791,496 @@ func TestVirtualQueue_MergeWithLastSlice(t *testing.T) { }) } } + +func TestVirtualQueue_InsertSingleTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) + expectedResult bool + }{ + { + name: "Successfully insert task into first slice", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(true) + mockSlice1.EXPECT().GetPendingTaskCount().Return(5) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 5) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "Successfully insert task into second slice", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(false) + mockSlice2.EXPECT().InsertTask(mockTask).Return(true) + mockSlice2.EXPECT().GetPendingTaskCount().Return(3) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 3) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "No slice accepts the task", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(false) + mockSlice2.EXPECT().InsertTask(mockTask).Return(false) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: false, + }, + { + name: "Insert task with submission error", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(true) + mockSlice1.EXPECT().GetPendingTaskCount().Return(1) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 1) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, assert.AnError) + + return []VirtualSlice{mockSlice1}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "Insert task with empty slice list", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + return []VirtualSlice{}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + slices, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource := tt.setupMocks(ctrl) + mockLogger := testlogger.New(t) + mockMetricsScope := metrics.NoopScope + mockRateLimiter := quotas.NewMockLimiter(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + slices, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + result := queue.InsertSingleTask(mockTask) + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestVirtualQueue_ResetProgress(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) + expectedSliceToReadIdx *int + }{ + { + name: "Reset progress on all slices before sliceToRead", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockSlice3 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 5) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(2) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 2) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(3) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 3) + mockSlice2.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2, mockSlice3}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(1), + }, + { + name: "Reset progress with no sliceToRead set", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 10) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 0) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(1) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 1) + mockSlice2.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(1), + }, + { + name: "Reset progress with no slices having more tasks", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 15) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 0) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 0) + mockSlice2.EXPECT().HasMoreTasks().Return(false) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: nil, + }, + { + name: "Reset progress with empty slice list", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 20) + + return []VirtualSlice{}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: nil, + }, + { + name: "Reset progress stops at first slice when it's sliceToRead", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 25) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(4) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 4) + mockSlice1.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(0), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + slices, mockMonitor, resetKey := tt.setupMocks(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockLogger := testlogger.New(t) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + mockRateLimiter := quotas.NewMockLimiter(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + slices, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + queueImpl := queue.(*virtualQueueImpl) + + if len(slices) > 0 && tt.name != "Reset progress with no sliceToRead set" && tt.name != "Reset progress with empty slice list" { + if tt.name == "Reset progress stops at first slice when it's sliceToRead" { + queueImpl.sliceToRead = queueImpl.virtualSlices.Front() + } else { + queueImpl.sliceToRead = queueImpl.virtualSlices.Front().Next() + } + } else if tt.name == "Reset progress with no sliceToRead set" { + queueImpl.sliceToRead = nil + } + + queue.ResetProgress(resetKey) + + if tt.expectedSliceToReadIdx == nil { + assert.Nil(t, queueImpl.sliceToRead, "sliceToRead should be nil") + } else { + assert.NotNil(t, queueImpl.sliceToRead, "sliceToRead should not be nil") + if queueImpl.sliceToRead != nil { + expectedSlice := slices[*tt.expectedSliceToReadIdx] + assert.Equal(t, expectedSlice, queueImpl.sliceToRead.Value.(VirtualSlice)) + } + } + }) + } +} + +func TestVirtualQueue_SubmitOrRescheduleTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) + expectErr bool + }{ + { + name: "Submit corrupted task - should ack and return nil", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + + mockTask.EXPECT().GetDomainID().Return("").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("").AnyTimes() + mockTask.EXPECT().GetRunID().Return("").AnyTimes() + mockTask.EXPECT().Ack() + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Reschedule future task", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + futureTime := now.Add(time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(futureTime, 1)) + mockRescheduler.EXPECT().RescheduleTask(mockTask, futureTime) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Successfully submit task", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Submit task with processor error", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, assert.AnError) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: true, + }, + { + name: "Task submission throttled - should reschedule", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(false, nil) + mockRescheduler.EXPECT().RescheduleTask(mockTask, now.Add(taskSchedulerThrottleBackoffInterval)) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Task submission throttled with error - should reschedule and return error", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(false, assert.AnError) + mockRescheduler.EXPECT().RescheduleTask(mockTask, now.Add(taskSchedulerThrottleBackoffInterval)) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now := tt.setupMocks(ctrl) + mockLogger := testlogger.New(t) + mockRateLimiter := quotas.NewMockLimiter(ctrl) + mockMonitor := NewMockMonitor(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + []VirtualSlice{}, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + queueImpl := queue.(*virtualQueueImpl) + err := queueImpl.submitOrRescheduleTask(now, mockTask) + + if tt.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 4638a017560..405c91c036e 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -37,11 +37,13 @@ type ( GetState() VirtualSliceState IsEmpty() bool GetTasks(context.Context, int) ([]task.Task, error) + InsertTask(task.Task) bool HasMoreTasks() bool UpdateAndGetState() VirtualSliceState GetPendingTaskCount() int Clear() PendingTaskStats() PendingTaskStats + ResetProgress(key persistence.HistoryTaskKey) TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool) @@ -115,6 +117,15 @@ func (s *virtualSliceImpl) Clear() { } } +func (s *virtualSliceImpl) InsertTask(task task.Task) bool { + if s.state.Contains(task) { + s.pendingTaskTracker.AddTask(task) + return true + } + + return false +} + func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) { if len(s.progress) == 0 { return nil, nil @@ -191,6 +202,58 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState { return s.state } +// this function is used when we are not sure if our in-memory state after the given key is correct, +// we want to cancel all the tasks after the given key and reset the progress to the given key, +// so that in the next poll, we will read the tasks from the DB starting from the given key. +func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) { + + // the given key is after the current slice, no need to reset + if key.Compare(s.state.Range.ExclusiveMaxTaskKey) >= 0 { + return + } + + taskMap := s.pendingTaskTracker.GetTasks() + for _, task := range taskMap { + if task.GetTaskKey().Compare(key) >= 0 { + task.Cancel() + } + } + + if len(s.progress) == 0 { + s.progress = []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: key, + ExclusiveMaxTaskKey: s.state.Range.ExclusiveMaxTaskKey, + }, + NextPageToken: nil, + NextTaskKey: key, + }, + } + return + } + + for i, progress := range s.progress { + // progress contains sorted non-overlapping ranges. If we found a range that contains the given key, we can reset + // this range's progress to the given key and merge the remaining ranges into it. + if progress.ExclusiveMaxTaskKey.Compare(key) > 0 { + maxTaskKey := s.progress[len(s.progress)-1].Range.ExclusiveMaxTaskKey + s.progress[i] = &GetTaskProgress{ + Range: Range{ + InclusiveMinTaskKey: persistence.MinHistoryTaskKey(key, progress.InclusiveMinTaskKey), + ExclusiveMaxTaskKey: maxTaskKey, + }, + NextPageToken: nil, + NextTaskKey: persistence.MinHistoryTaskKey(key, progress.NextTaskKey), + } + + s.state.Range.InclusiveMinTaskKey = persistence.MinHistoryTaskKey(key, s.state.Range.InclusiveMinTaskKey) + s.progress = s.progress[:i+1] + break + } + } +} + func (s *virtualSliceImpl) TrySplitByTaskKey(taskKey persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) { leftState, rightState, ok := s.state.TrySplitByTaskKey(taskKey) if !ok { diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index ea6e7085cf5..0fdd296f54a 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -112,6 +112,20 @@ func (mr *MockVirtualSliceMockRecorder) HasMoreTasks() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasMoreTasks", reflect.TypeOf((*MockVirtualSlice)(nil).HasMoreTasks)) } +// InsertTask mocks base method. +func (m *MockVirtualSlice) InsertTask(arg0 task.Task) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertTask", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// InsertTask indicates an expected call of InsertTask. +func (mr *MockVirtualSliceMockRecorder) InsertTask(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTask", reflect.TypeOf((*MockVirtualSlice)(nil).InsertTask), arg0) +} + // IsEmpty mocks base method. func (m *MockVirtualSlice) IsEmpty() bool { m.ctrl.T.Helper() @@ -140,6 +154,18 @@ func (mr *MockVirtualSliceMockRecorder) PendingTaskStats() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PendingTaskStats", reflect.TypeOf((*MockVirtualSlice)(nil).PendingTaskStats)) } +// ResetProgress mocks base method. +func (m *MockVirtualSlice) ResetProgress(key persistence.HistoryTaskKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ResetProgress", key) +} + +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualSliceMockRecorder) ResetProgress(key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualSlice)(nil).ResetProgress), key) +} + // TryMergeWithVirtualSlice mocks base method. func (m *MockVirtualSlice) TryMergeWithVirtualSlice(arg0 VirtualSlice) ([]VirtualSlice, bool) { m.ctrl.T.Helper() diff --git a/service/history/queuev2/virtual_slice_test.go b/service/history/queuev2/virtual_slice_test.go index 6f0b7953639..8b21242396b 100644 --- a/service/history/queuev2/virtual_slice_test.go +++ b/service/history/queuev2/virtual_slice_test.go @@ -3472,3 +3472,367 @@ func TestMergeVirtualSlicesWithDifferentPredicate(t *testing.T) { }) } } + +func TestVirtualSliceImpl_InsertTask(t *testing.T) { + tests := []struct { + name string + setupSlice func(ctrl *gomock.Controller) *virtualSliceImpl + setupTask func(ctrl *gomock.Controller) task.Task + expectedResult bool + expectAddTask bool + }{ + { + name: "Task within range and predicate matches - should insert successfully", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(true) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + return mockTask + }, + expectedResult: true, + expectAddTask: true, + }, + { + name: "Task outside range - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(15)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + { + name: "Task within range but predicate doesn't match - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(false) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + { + name: "Task at inclusive min boundary - should insert successfully", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(true) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(1)).AnyTimes() + return mockTask + }, + expectedResult: true, + expectAddTask: true, + }, + { + name: "Task at exclusive max boundary - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(10)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + slice := tt.setupSlice(ctrl) + task := tt.setupTask(ctrl) + + if tt.expectAddTask { + slice.pendingTaskTracker.(*MockPendingTaskTracker).EXPECT().AddTask(task).Times(1) + } + + result := slice.InsertTask(task) + + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestVirtualSliceImpl_ResetProgress(t *testing.T) { + tests := []struct { + name string + setupSlice func(ctrl *gomock.Controller) *virtualSliceImpl + resetKey persistence.HistoryTaskKey + expectedProgressLen int + expectedCancelCalls int + validateProgress func(t *testing.T, progress []*GetTaskProgress) + }{ + { + name: "Reset key after slice range - should not reset", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(5), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(15), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + }, + }, + { + name: "Reset key within range with no progress - should create new progress", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{}, + } + }, + resetKey: persistence.NewImmediateTaskKey(5), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key within range with existing progress - should reset and cancel tasks", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockTask1 := task.NewMockTask(ctrl) + mockTask2 := task.NewMockTask(ctrl) + mockTask3 := task.NewMockTask(ctrl) + + mockTask1.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(3)).AnyTimes() + + mockTask2.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + mockTask2.EXPECT().Cancel().Times(1) + + mockTask3.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(7)).AnyTimes() + mockTask3.EXPECT().Cancel().Times(1) + + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{ + persistence.NewImmediateTaskKey(3): mockTask1, + persistence.NewImmediateTaskKey(5): mockTask2, + persistence.NewImmediateTaskKey(7): mockTask3, + }) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(8), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(5), + expectedProgressLen: 1, + expectedCancelCalls: 2, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key with multiple progress ranges - should merge remaining ranges", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(20), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(5), + }, + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(10), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(20), + }, + NextTaskKey: persistence.NewImmediateTaskKey(15), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(7), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(20), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key before current progress next task key - should use reset key as next task key", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(8), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(3), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(3), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + slice := tt.setupSlice(ctrl) + + slice.ResetProgress(tt.resetKey) + + assert.Equal(t, tt.expectedProgressLen, len(slice.progress)) + if tt.validateProgress != nil { + tt.validateProgress(t, slice.progress) + } + }) + } +} diff --git a/service/history/shard/context.go b/service/history/shard/context.go index a1e9724f383..2435025a3d1 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -246,7 +246,12 @@ func (s *contextImpl) updateScheduledTaskMaxReadLevel(cluster string) persistenc currentTime = s.remoteClusterCurrentTime[cluster] } - newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.DBTimestampMinPrecision) + maxTimeShift := s.config.TimerProcessorMaxTimeShift() + if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) > 0 { + maxTimeShift = s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) + } + + newMaxReadLevel := currentTime.Add(maxTimeShift).Truncate(persistence.DBTimestampMinPrecision) if newMaxReadLevel.After(s.scheduledTaskMaxReadLevelMap[cluster]) { s.scheduledTaskMaxReadLevelMap[cluster] = newMaxReadLevel } @@ -1327,23 +1332,29 @@ func (s *contextImpl) allocateTimerIDsLocked( } readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster] + // make sure scheduled task timestamp is higher than // 1. max read level, so that queue processor can read the task back. // 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics. // Above cases can happen if shard move and new host have a time SKU, // or there is db write delay, or we are simply (re-)generating tasks for an old workflow. - if ts.Before(readCursorTS) { - // This can happen if shard move and new host have a time SKU, or there is db write delay. - // We generate a new timer ID using timerMaxReadLevel. - s.logger.Warn("New timer generated is less than read level", - tag.WorkflowDomainID(domainEntry.GetInfo().ID), - tag.WorkflowID(workflowID), - tag.Timestamp(ts), - tag.CursorTimestamp(readCursorTS), - tag.ClusterName(cluster), - tag.ValueShardAllocateTimerBeforeRead) - ts = readCursorTS.Add(persistence.DBTimestampMinPrecision) + + if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) == 0 { + // this check is only required when an in memory queue is disabled. If it's enabled, we expect timers below read level to be enqueued in memory + if ts.Before(readCursorTS) { + // This can happen if shard move and new host have a time SKU, or there is db write delay. + // We generate a new timer ID using timerMaxReadLevel. + s.logger.Warn("New timer generated is less than read level", + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(workflowID), + tag.Timestamp(ts), + tag.CursorTimestamp(readCursorTS), + tag.ClusterName(cluster), + tag.ValueShardAllocateTimerBeforeRead) + ts = readCursorTS.Add(persistence.DBTimestampMinPrecision) + } } + if ts.Before(now) { s.logger.Warn("New timer generated is in the past", tag.WorkflowDomainID(domainEntry.GetInfo().ID), @@ -1352,6 +1363,15 @@ func (s *contextImpl) allocateTimerIDsLocked( tag.ValueShardAllocateTimerBeforeRead) ts = now.Add(persistence.DBTimestampMinPrecision) } + + if ts.Before(s.shardInfo.TimerAckLevel) { + s.logger.Warn("New timer generated is less than ack level", + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(workflowID), + tag.Timestamp(ts)) + ts = s.shardInfo.TimerAckLevel.Add(persistence.DBTimestampMinPrecision) + } + task.SetVisibilityTimestamp(ts) seqNum, err := s.generateTaskIDLocked() diff --git a/simulation/history/dynamicconfig/queuev2_in_memory.yaml b/simulation/history/dynamicconfig/queuev2_in_memory.yaml new file mode 100644 index 00000000000..261b97a780c --- /dev/null +++ b/simulation/history/dynamicconfig/queuev2_in_memory.yaml @@ -0,0 +1,39 @@ +system.workflowDeletionJitterRange: +- value: 0 + constraints: {} +history.enableTimerQueueV2: +- value: true + constraints: {} +history.enableTransferQueueV2: +- value: true + constraints: {} +history.queueMaxPendingTaskCount: +- value: 10000 + constraints: {} +history.timerTaskBatchSize: +- value: 100 + constraints: {} +history.timerProcessorUpdateAckInterval: +- value: 30s + constraints: {} +history.timerProcessorUpdateAckIntervalJitterCoefficient: +- value: 0 + constraints: {} +history.timerProcessorMaxPollRPS: +- value: 20 + constraints: {} +history.transferTaskBatchSize: +- value: 100 + constraints: {} +history.transferProcessorUpdateAckInterval: +- value: 30s + constraints: {} +history.transferProcessorUpdateAckIntervalJitterCoefficient: +- value: 0 + constraints: {} +history.transferProcessorMaxPollRPS: +- value: 20 + constraints: {} +history.timerProcessorInMemoryQueueMaxTimeShift: +- value: 30s + constraints: {} \ No newline at end of file diff --git a/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml b/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml new file mode 100644 index 00000000000..02060f60f55 --- /dev/null +++ b/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml @@ -0,0 +1,17 @@ +enablearchival: false +clusterno: 0 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 +workerconfig: + enableasyncwfconsumer: false + enablearchiver: false + enablereplicator: false + enableindexer: false +dynamicclientconfig: + filepath: "dynamicconfig/queuev2_in_memory.yaml" + pollInterval: "10s"