Skip to content

Commit

Permalink
IWF-274: consolidate debug query methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jbowers committed Feb 4, 2025
1 parent 083322b commit 5a35a77
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 55 deletions.
20 changes: 10 additions & 10 deletions integ/greedy_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ func doTestGreedyTimerWorkflowCustomConfig(t *testing.T, backendType service.Bac
time.Sleep(time.Second * 1)

// assertions
timers := service.GetScheduledGreedyTimerTimesQueryResponse{}
err = uClient.QueryWorkflow(context.Background(), &timers, wfId, "", service.GetScheduledGreedyTimerTimesQueryType)
debug := service.DebugDumpResponse{}
err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}
assertions.Equal(1, len(timers.ScheduledGreedyTimerTimes))
singleTimerScheduled := timers.ScheduledGreedyTimerTimes[0]
assertions.Equal(1, len(debug.FiringTimersUnixTimestamps))
singleTimerScheduled := debug.FiringTimersUnixTimestamps[0]

scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 20, 1)

Expand All @@ -127,15 +127,15 @@ func doTestGreedyTimerWorkflowCustomConfig(t *testing.T, backendType service.Bac

time.Sleep(time.Second * 1)

err = uClient.QueryWorkflow(context.Background(), &timers, wfId, "", service.GetScheduledGreedyTimerTimesQueryType)
err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}

// no second timer started
assertions.Equal(1, len(timers.ScheduledGreedyTimerTimes))
assertions.Equal(1, len(debug.FiringTimersUnixTimestamps))
// LessOrEqual due to continue as new workflow scheduling the next, not skipped timer
assertions.LessOrEqual(singleTimerScheduled, timers.ScheduledGreedyTimerTimes[0])
assertions.LessOrEqual(singleTimerScheduled, debug.FiringTimersUnixTimestamps[0])
scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 5, 2)

// wait for the workflow
Expand Down Expand Up @@ -178,11 +178,11 @@ func scheduleTimerAndAssertExpectedScheduled(

time.Sleep(time.Second * 1)

timers := service.GetScheduledGreedyTimerTimesQueryResponse{}
err = uClient.QueryWorkflow(context.Background(), &timers, wfId, "", service.GetScheduledGreedyTimerTimesQueryType)
debug := service.DebugDumpResponse{}
err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
}

assertions.LessOrEqual(len(timers.ScheduledGreedyTimerTimes), noMoreThan)
assertions.LessOrEqual(len(debug.FiringTimersUnixTimestamps), noMoreThan)
}
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 1 files
+4 −0 iwf.yaml
13 changes: 6 additions & 7 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ const (
StateDecideApi = "/api/v1/workflowState/decide"
WorkflowWorkerRpcApi = "/api/v1/workflowWorker/rpc"

GetDataAttributesWorkflowQueryType = "GetDataAttributes"
GetSearchAttributesWorkflowQueryType = "GetSearchAttributes"
GetCurrentTimerInfosQueryType = "GetCurrentTimerInfos"
GetScheduledGreedyTimerTimesQueryType = "GetScheduledGreedyTimerTimes"
ContinueAsNewDumpByPageQueryType = "ContinueAsNewDumpByPage"
DebugDumpQueryType = "DebugNewDump"
PrepareRpcQueryType = "PrepareRpcQueryType"
GetDataAttributesWorkflowQueryType = "GetDataAttributes"
GetSearchAttributesWorkflowQueryType = "GetSearchAttributes"
GetCurrentTimerInfosQueryType = "GetCurrentTimerInfos"
ContinueAsNewDumpByPageQueryType = "ContinueAsNewDumpByPage"
DebugDumpQueryType = "DebugNewDump"
PrepareRpcQueryType = "PrepareRpcQueryType"

ExecuteOptimisticLockingRpcUpdateType = "ExecuteOptimisticLockingRpcUpdate"

Expand Down
8 changes: 4 additions & 4 deletions service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ type (
}

GetScheduledGreedyTimerTimesQueryResponse struct {
ScheduledGreedyTimerTimes []int64
PendingScheduled []*TimerInfo
PendingScheduled []*TimerInfo
}

TimerInfo struct {
Expand Down Expand Up @@ -138,8 +137,9 @@ type (
}

DebugDumpResponse struct {
Config iwfidl.WorkflowConfig
Snapshot ContinueAsNewDumpResponse
Config iwfidl.WorkflowConfig
Snapshot ContinueAsNewDumpResponse
FiringTimersUnixTimestamps []int64
}

StateExecutionCounterInfo struct {
Expand Down
2 changes: 2 additions & 0 deletions service/interpreter/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type TimerProcessor interface {
WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus
RemovePendingTimersOfState(stateExeId string)
AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus)
GetTimerInfos() map[string][]*service.TimerInfo
GetTimerStartedUnixTimestamps() []int64
}

type WorkflowProvider interface {
Expand Down
27 changes: 22 additions & 5 deletions service/interpreter/queryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ import (
)

func SetQueryHandlers(
ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager,
internalChannel *InternalChannel, signalReceiver *SignalReceiver,
ctx interfaces.UnifiedContext,
provider interfaces.WorkflowProvider,
timerProcessor interfaces.TimerProcessor,
persistenceManager *PersistenceManager,
internalChannel *InternalChannel,
signalReceiver *SignalReceiver,
continueAsNewer *ContinueAsNewer,
workflowConfiger *config.WorkflowConfiger, basicInfo service.BasicInfo,
workflowConfiger *config.WorkflowConfiger,
basicInfo service.BasicInfo,
) error {
err := provider.SetQueryHandler(ctx, service.GetDataAttributesWorkflowQueryType, func(req service.GetDataAttributesQueryRequest) (service.GetDataAttributesQueryResponse, error) {
dos := persistenceManager.GetDataObjectsByKey(req)
Expand All @@ -32,8 +37,9 @@ func SetQueryHandlers(
}
err = provider.SetQueryHandler(ctx, service.DebugDumpQueryType, func() (*service.DebugDumpResponse, error) {
return &service.DebugDumpResponse{
Config: workflowConfiger.Get(),
Snapshot: continueAsNewer.GetSnapshot(),
Config: workflowConfiger.Get(),
Snapshot: continueAsNewer.GetSnapshot(),
FiringTimersUnixTimestamps: timerProcessor.GetTimerStartedUnixTimestamps(),
}, nil
})
if err != nil {
Expand All @@ -56,5 +62,16 @@ func SetQueryHandlers(
if err != nil {
return err
}

err = provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
return service.GetCurrentTimerInfosQueryResponse{
StateExecutionCurrentTimerInfos: timerProcessor.GetTimerInfos(),
}, nil
})

if err != nil {
return err
}

return nil
}
27 changes: 8 additions & 19 deletions service/interpreter/timers/greedyTimerProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,21 @@ func NewGreedyTimerProcessor(
staleSkipTimerSignals: staleSkipTimerSignals,
}

err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
return service.GetCurrentTimerInfosQueryResponse{
StateExecutionCurrentTimerInfos: tp.stateExecutionCurrentTimerInfos,
}, nil
})
if err != nil {
panic("cannot set query handler")
}

err = provider.SetQueryHandler(ctx, service.GetScheduledGreedyTimerTimesQueryType, func() (service.GetScheduledGreedyTimerTimesQueryResponse, error) {
return service.GetScheduledGreedyTimerTimesQueryResponse{
ScheduledGreedyTimerTimes: tp.timerManager.providerScheduledTimerUnixTs,
PendingScheduled: tp.timerManager.pendingScheduling,
}, nil
})
if err != nil {
panic("cannot set query handler")
}

return tp
}

func (t *GreedyTimerProcessor) Dump() []service.StaleSkipTimerSignal {
return t.staleSkipTimerSignals
}

func (t *GreedyTimerProcessor) GetTimerInfos() map[string][]*service.TimerInfo {
return t.stateExecutionCurrentTimerInfos
}

func (t *GreedyTimerProcessor) GetTimerStartedUnixTimestamps() []int64 {
return t.timerManager.providerScheduledTimerUnixTs
}

// SkipTimer will attempt to skip a timer, return false if no valid timer found
func (t *GreedyTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool {
timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx)
Expand Down
28 changes: 20 additions & 8 deletions service/interpreter/timers/simpleTimerProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type SimpleTimerProcessor struct {
stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo
awaitingTimers []int64
staleSkipTimerSignals []service.StaleSkipTimerSignal
provider interfaces.WorkflowProvider
logger interfaces.UnifiedLogger
Expand All @@ -25,21 +26,21 @@ func NewSimpleTimerProcessor(
staleSkipTimerSignals: staleSkipTimerSignals,
}

err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
return service.GetCurrentTimerInfosQueryResponse{
StateExecutionCurrentTimerInfos: tp.stateExecutionCurrentTimerInfos,
}, nil
})
if err != nil {
panic("cannot set query handler")
}
return tp
}

func (t *SimpleTimerProcessor) Dump() []service.StaleSkipTimerSignal {
return t.staleSkipTimerSignals
}

func (t *SimpleTimerProcessor) GetTimerInfos() map[string][]*service.TimerInfo {
return t.stateExecutionCurrentTimerInfos
}

func (t *SimpleTimerProcessor) GetTimerStartedUnixTimestamps() []int64 {
return t.awaitingTimers
}

// SkipTimer will attempt to skip a timer, return false if no valid timer found
func (t *SimpleTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool {
timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx)
Expand Down Expand Up @@ -100,9 +101,11 @@ func (t *SimpleTimerProcessor) WaitForTimerFiredOrSkipped(
fireAt := timer.FiringUnixTimestampSeconds
duration := time.Duration(fireAt-now) * time.Second
future := t.provider.NewTimer(ctx, duration)
t.awaitingTimers = append(t.awaitingTimers, fireAt)
_ = t.provider.Await(ctx, func() bool {
return future.IsReady() || timer.Status == service.TimerSkipped || *cancelWaiting
})
t.awaitingTimers = removeSingleTime(t.awaitingTimers, fireAt)
if timer.Status == service.TimerSkipped {
return service.TimerSkipped
}
Expand All @@ -113,6 +116,15 @@ func (t *SimpleTimerProcessor) WaitForTimerFiredOrSkipped(
return service.TimerPending
}

func removeSingleTime(timers []int64, at int64) []int64 {
for i, t := range timers {
if t == at {
return append(timers[:i], timers[i+1:]...)
}
}
return timers
}

// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers
func (t *SimpleTimerProcessor) RemovePendingTimersOfState(stateExeId string) {
delete(t.stateExecutionCurrentTimerInfos, stateExeId)
Expand Down
2 changes: 1 addition & 1 deletion service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func InterpreterImpl(
// This is to ensure the correctness. If we set the query handler before that,
// the query handler could return empty data (since the loading hasn't completed), which will be incorrect response.
// We would rather return server errors and let the client retry later.
err = SetQueryHandlers(ctx, provider, persistenceManager, internalChannel, signalReceiver, continueAsNewer, workflowConfiger, basicInfo)
err = SetQueryHandlers(ctx, provider, timerProcessor, persistenceManager, internalChannel, signalReceiver, continueAsNewer, workflowConfiger, basicInfo)
if err != nil {
retErr = err
return
Expand Down

0 comments on commit 5a35a77

Please sign in to comment.