Skip to content

Commit 4c4f7de

Browse files
committed
Clean up
Signed-off-by: Ignat Tubylov <[email protected]>
1 parent 0b7f79b commit 4c4f7de

File tree

7 files changed

+50
-14
lines changed

7 files changed

+50
-14
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,7 +2113,6 @@ const (
21132113
EnableTimerQueueV2
21142114
EnableTransferQueueV2PendingTaskCountAlert
21152115
EnableTimerQueueV2PendingTaskCountAlert
2116-
EnableTimerProcessorInMemoryQueue
21172116

21182117
// LastBoolKey must be the last one in this const group
21192118
LastBoolKey
@@ -2673,6 +2672,12 @@ const (
26732672
// Default value: 1s (1*time.Second)
26742673
// Allowed filters: N/A
26752674
TimerProcessorMaxTimeShift
2675+
// TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.
2676+
// KeyName: history.timerProcessorInMemoryQueueMaxTimeShift
2677+
// Value type: Duration
2678+
// Default value: 0
2679+
// Allowed filters: ShardID
2680+
TimerProcessorInMemoryQueueMaxTimeShift
26762681
// TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer
26772682
// failover queue processing. The actual jitter interval used will be a random duration between
26782683
// 0 and the max interval so that timer failover queue across different shards won't start at
@@ -4681,12 +4686,6 @@ var BoolKeys = map[BoolKey]DynamicBool{
46814686
Filters: []Filter{ShardID},
46824687
DefaultValue: false,
46834688
},
4684-
EnableTimerProcessorInMemoryQueue: {
4685-
KeyName: "history.enableTimerProcessorInMemoryQueue",
4686-
Description: "EnableTimerProcessorInMemoryQueue is the flag to enable in-memory queue for timer processor",
4687-
Filters: []Filter{ShardID},
4688-
DefaultValue: false,
4689-
},
46904689
}
46914690

46924691
var FloatKeys = map[FloatKey]DynamicFloat{
@@ -5205,6 +5204,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{
52055204
Description: "TimerProcessorMaxTimeShift is the max shift timer processor can have",
52065205
DefaultValue: time.Second,
52075206
},
5207+
TimerProcessorInMemoryQueueMaxTimeShift: {
5208+
KeyName: "history.timerProcessorInMemoryQueueMaxTimeShift",
5209+
Filters: []Filter{ShardID},
5210+
Description: "TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.",
5211+
DefaultValue: 0,
5212+
},
52085213
TransferProcessorFailoverMaxStartJitterInterval: {
52095214
KeyName: "history.transferProcessorFailoverMaxStartJitterInterval",
52105215
Description: "TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer failover queue processing. The actual jitter interval used will be a random duration between 0 and the max interval so that timer failover queue across different shards won't start at the same time",

service/history/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ type Config struct {
154154
TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn
155155
TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn
156156
TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn
157-
EnableTimerProcessorInMemoryQueue dynamicproperties.BoolPropertyFnWithShardIDFilter
157+
TimerProcessorInMemoryQueueMaxTimeShift dynamicproperties.DurationPropertyFnWithShardIDFilter
158158
TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn
159159
TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn
160160
DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn
@@ -454,7 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
454454
TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient),
455455
TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize),
456456
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift),
457-
EnableTimerProcessorInMemoryQueue: dc.GetBoolPropertyFilteredByShardID(dynamicproperties.EnableTimerProcessorInMemoryQueue),
457+
TimerProcessorInMemoryQueueMaxTimeShift: dc.GetDurationPropertyFilteredByShardID(dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift),
458458
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit),
459459
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit),
460460
DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue),

service/history/config/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func TestNewConfig(t *testing.T) {
139139
"TimerProcessorSplitQueueIntervalJitterCoefficient": {dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient, 4.0},
140140
"TimerProcessorMaxRedispatchQueueSize": {dynamicproperties.TimerProcessorMaxRedispatchQueueSize, 45},
141141
"TimerProcessorMaxTimeShift": {dynamicproperties.TimerProcessorMaxTimeShift, time.Second},
142+
"TimerProcessorInMemoryQueueMaxTimeShift": {dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift, 0},
142143
"TimerProcessorHistoryArchivalSizeLimit": {dynamicproperties.TimerProcessorHistoryArchivalSizeLimit, 46},
143144
"TimerProcessorArchivalTimeLimit": {dynamicproperties.TimerProcessorArchivalTimeLimit, time.Second},
144145
"TransferTaskBatchSize": {dynamicproperties.TransferTaskBatchSize, 47},
@@ -278,7 +279,6 @@ func TestNewConfig(t *testing.T) {
278279
"QueueMaxVirtualQueueCount": {dynamicproperties.QueueMaxVirtualQueueCount, 101},
279280
"VirtualSliceForceAppendInterval": {dynamicproperties.VirtualSliceForceAppendInterval, time.Second},
280281
"ReplicationTaskProcessorLatencyLogThreshold": {dynamicproperties.ReplicationTaskProcessorLatencyLogThreshold, time.Duration(0)},
281-
"EnableTimerProcessorInMemoryQueue": {dynamicproperties.EnableTimerProcessorInMemoryQueue, false},
282282
}
283283
client := dynamicconfig.NewInMemoryClient()
284284
for fieldName, expected := range fields {

service/history/queuev2/virtual_queue.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,11 @@ func (q *virtualQueueImpl) RemoveScheduledTasksAfter(t time.Time) {
464464
continue
465465
}
466466

467-
// TODO: remove scheduled tasks from virtual slices
467+
s.CancelTasks(func(task task.Task) bool {
468+
taskTime := task.GetTaskKey().GetScheduledTime()
469+
return taskTime.After(t) || taskTime.Equal(t)
470+
})
471+
468472
q.monitor.SetSlicePendingTaskCount(s, s.GetPendingTaskCount())
469473
}
470474
}

service/history/queuev2/virtual_slice.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type (
4343
GetPendingTaskCount() int
4444
Clear()
4545
PendingTaskStats() PendingTaskStats
46+
CancelTasks(predicate func(task.Task) bool)
4647

4748
TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool)
4849
TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool)
@@ -196,6 +197,15 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState {
196197
return s.state
197198
}
198199

200+
func (s *virtualSliceImpl) CancelTasks(predicate func(task.Task) bool) {
201+
taskMap := s.pendingTaskTracker.GetTasks()
202+
for _, task := range taskMap {
203+
if predicate(task) {
204+
task.Cancel()
205+
}
206+
}
207+
}
208+
199209
func (s *virtualSliceImpl) TrySplitByTaskKey(taskKey persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) {
200210
leftState, rightState, ok := s.state.TrySplitByTaskKey(taskKey)
201211
if !ok {

service/history/queuev2/virtual_slice_mock.go

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/shard/context.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,12 @@ func (s *contextImpl) updateScheduledTaskMaxReadLevel(cluster string) persistenc
246246
currentTime = s.remoteClusterCurrentTime[cluster]
247247
}
248248

249-
newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.DBTimestampMinPrecision)
249+
maxTimeShift := s.config.TimerProcessorMaxTimeShift()
250+
if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) > 0 {
251+
maxTimeShift = s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID)
252+
}
253+
254+
newMaxReadLevel := currentTime.Add(maxTimeShift).Truncate(persistence.DBTimestampMinPrecision)
250255
if newMaxReadLevel.After(s.scheduledTaskMaxReadLevelMap[cluster]) {
251256
s.scheduledTaskMaxReadLevelMap[cluster] = newMaxReadLevel
252257
}
@@ -1334,8 +1339,8 @@ func (s *contextImpl) allocateTimerIDsLocked(
13341339
// Above cases can happen if shard move and new host have a time SKU,
13351340
// or there is db write delay, or we are simply (re-)generating tasks for an old workflow.
13361341

1337-
// we need to skip this check for in-memory queue because it is going to be populated before the db is read
1338-
if !s.config.EnableTimerProcessorInMemoryQueue(s.shardID) {
1342+
if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) == 0 {
1343+
// this check is only required when an in memory queue is disabled. If it's enabled, we expect timers below read level to be enqueued in memory
13391344
if ts.Before(readCursorTS) {
13401345
// This can happen if shard move and new host have a time SKU, or there is db write delay.
13411346
// We generate a new timer ID using timerMaxReadLevel.

0 commit comments

Comments
 (0)