From 5b9db14cb8a6d21cc48536d57646c179ed1e5602 Mon Sep 17 00:00:00 2001 From: John Bowers <4923055+bowersj27@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:25:39 -0600 Subject: [PATCH] IWF-274: Optimize Timer creation (#529) Co-authored-by: jbowers --- integ/any_timer_signal_test.go | 40 ++++ integ/greedy_timer_test.go | 188 ++++++++++++++++++ integ/timer_test.go | 41 ++++ integ/util.go | 17 ++ integ/workflow/greedy_timer/routers.go | 154 ++++++++++++++ service/api/service.go | 12 +- service/interfaces.go | 9 +- service/interpreter/activityImpl.go | 21 +- service/interpreter/activityImpl_test.go | 5 +- .../interpreter/cadence/activityProvider.go | 12 +- service/interpreter/cadence/workflow.go | 5 +- .../interpreter/cadence/workflowProvider.go | 68 +++---- .../{ => config}/workflowConfiger.go | 2 +- .../{ => cont}/continueAsNewCounter.go | 15 +- service/interpreter/continueAsNewer.go | 19 +- service/interpreter/globalVersioner.go | 7 +- .../{ => interfaces}/interfaces.go | 15 +- .../{ => interfaces}/interfaces_mock.go | 2 +- service/interpreter/persistence.go | 17 +- service/interpreter/queryHandler.go | 29 ++- service/interpreter/signalReceiver.go | 31 +-- service/interpreter/stateExecutionCounter.go | 19 +- .../interpreter/temporal/activityProvider.go | 12 +- service/interpreter/temporal/workflow.go | 5 +- .../interpreter/temporal/workflowProvider.go | 74 +++---- .../timers/greedyTimerProcessor.go | 166 ++++++++++++++++ .../timers/greedyTimerScheduler.go | 128 ++++++++++++ .../simpleTimerProcessor.go} | 80 +++----- service/interpreter/timers/utils.go | 31 +++ service/interpreter/workflowImpl.go | 70 ++++--- service/interpreter/workflowUpdater.go | 27 +-- 31 files changed, 1071 insertions(+), 250 deletions(-) create mode 100644 integ/greedy_timer_test.go create mode 100644 integ/workflow/greedy_timer/routers.go rename service/interpreter/{ => config}/workflowConfiger.go (98%) rename service/interpreter/{ => cont}/continueAsNewCounter.go (75%) rename service/interpreter/{ => interfaces}/interfaces.go (88%) rename service/interpreter/{ => interfaces}/interfaces_mock.go (99%) create mode 100644 service/interpreter/timers/greedyTimerProcessor.go create mode 100644 service/interpreter/timers/greedyTimerScheduler.go rename service/interpreter/{timerProcessor.go => timers/simpleTimerProcessor.go} (66%) create mode 100644 service/interpreter/timers/utils.go diff --git a/integ/any_timer_signal_test.go b/integ/any_timer_signal_test.go index 6f931d9a..c65e6677 100644 --- a/integ/any_timer_signal_test.go +++ b/integ/any_timer_signal_test.go @@ -22,6 +22,16 @@ func TestAnyTimerSignalWorkflowTemporal(t *testing.T) { } } +func TestGreedyAnyTimerSignalWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig()) + smallWaitForFastTest() + } +} + func TestAnyTimerSignalWorkflowCadence(t *testing.T) { if !*cadenceIntegTest { t.Skip() @@ -32,6 +42,16 @@ func TestAnyTimerSignalWorkflowCadence(t *testing.T) { } } +func TestGreedyAnyTimerSignalWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig()) + smallWaitForFastTest() + } +} + func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) { if !*temporalIntegTest { t.Skip() @@ -42,6 +62,16 @@ func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) { } } +func TestGreedyAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { if !*cadenceIntegTest { t.Skip() @@ -52,6 +82,16 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { } } +func TestGreedyAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := anytimersignal.NewHandler() diff --git a/integ/greedy_timer_test.go b/integ/greedy_timer_test.go new file mode 100644 index 00000000..b3218105 --- /dev/null +++ b/integ/greedy_timer_test.go @@ -0,0 +1,188 @@ +package integ + +import ( + "context" + "encoding/json" + "github.com/indeedeng/iwf/integ/workflow/greedy_timer" + uclient "github.com/indeedeng/iwf/service/client" + "github.com/stretchr/testify/assert" + "log" + "strconv" + "testing" + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" +) + +func TestGreedyTimerWorkflowBaseTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestGreedyTimerWorkflow(t, service.BackendTypeTemporal) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowBaseCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestGreedyTimerWorkflow(t, service.BackendTypeCadence) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowBaseTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestGreedyTimerWorkflowCustomConfig(t, service.BackendTypeTemporal, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowBaseCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestGreedyTimerWorkflowCustomConfig(t, service.BackendTypeCadence, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + +func doTestGreedyTimerWorkflow(t *testing.T, backendType service.BackendType) { + doTestGreedyTimerWorkflowCustomConfig(t, backendType, minimumGreedyTimerConfig()) +} + +func doTestGreedyTimerWorkflowCustomConfig(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { + assertions := assert.New(t) + // start test workflow server + wfHandler := greedy_timer.NewHandler() + closeFunc1 := startWorkflowWorkerWithRpc(wfHandler, t) + defer closeFunc1() + + uClient, closeFunc2 := startIwfServiceWithClient(backendType) + defer closeFunc2() + + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + + // start a workflow + durations := []int64{15, 30} + input := greedy_timer.Input{Durations: durations} + + wfId := greedy_timer.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + inputData, _ := json.Marshal(input) + + //schedule-1 + _, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: greedy_timer.WorkflowType, + WorkflowTimeoutSeconds: 30, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: ptr.Any(greedy_timer.ScheduleTimerState), + StateInput: &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString(string(inputData)), + }, + WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ + WorkflowConfigOverride: config, + }, + }).Execute() + failTestAtHttpError(err, httpResp, t) + + time.Sleep(time.Second * 1) + + // assertions + debug := service.DebugDumpResponse{} + err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType) + if err != nil { + log.Fatalf("Fail to invoke query %v", err) + } + assertions.Equal(1, len(debug.FiringTimersUnixTimestamps)) + singleTimerScheduled := debug.FiringTimersUnixTimestamps[0] + + scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 20, 1) + + // skip next timer for state: schedule-1 + skipReq := apiClient.DefaultApi.ApiV1WorkflowTimerSkipPost(context.Background()) + httpResp, err = skipReq.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{ + WorkflowId: wfId, + WorkflowStateExecutionId: greedy_timer.ScheduleTimerState + "-1", + TimerCommandId: iwfidl.PtrString("duration-15"), + }).Execute() + failTestAtHttpError(err, httpResp, t) + + time.Sleep(time.Second * 1) + + err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType) + if err != nil { + log.Fatalf("Fail to invoke query %v", err) + } + + // no second timer started + assertions.Equal(1, len(debug.FiringTimersUnixTimestamps)) + // LessOrEqual due to continue as new workflow scheduling the next, not skipped timer + assertions.LessOrEqual(singleTimerScheduled, debug.FiringTimersUnixTimestamps[0]) + scheduleTimerAndAssertExpectedScheduled(t, apiClient, uClient, wfId, 5, 2) + + // wait for the workflow + req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + _, httpResp, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + failTestAtHttpError(err, httpResp, t) + + history, _ := wfHandler.GetTestResult() + assertions.Equalf(map[string]int64{ + "schedule_start": 3, + "schedule_decide": 1, + }, history, "history does not match expected") +} + +func scheduleTimerAndAssertExpectedScheduled( + t *testing.T, + apiClient *iwfidl.APIClient, + uClient uclient.UnifiedClient, + wfId string, + duration int64, + noMoreThan int) { + + assertions := assert.New(t) + input := greedy_timer.Input{Durations: []int64{duration}} + inputData, _ := json.Marshal(input) + + reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background()) + _, httpResp, err := reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{ + WorkflowId: wfId, + RpcName: greedy_timer.SubmitDurationsRPC, + Input: &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString(string(inputData)), + }, + TimeoutSeconds: iwfidl.PtrInt32(2), + }).Execute() + failTestAtHttpError(err, httpResp, t) + + time.Sleep(time.Second * 1) + + debug := service.DebugDumpResponse{} + err = uClient.QueryWorkflow(context.Background(), &debug, wfId, "", service.DebugDumpQueryType) + if err != nil { + log.Fatalf("Fail to invoke query %v", err) + } + + assertions.LessOrEqual(len(debug.FiringTimersUnixTimestamps), noMoreThan) +} diff --git a/integ/timer_test.go b/integ/timer_test.go index 91da84fa..3628ae1a 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -55,6 +55,47 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { } } +// NOTE: greedy timers should have the same result in these timer tests +func TestGreedyTimerWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig()) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig()) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, greedyTimerConfig(true)) + smallWaitForFastTest() + } +} + func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := timer.NewHandler() diff --git a/integ/util.go b/integ/util.go index 0614f618..edc3214a 100644 --- a/integ/util.go +++ b/integ/util.go @@ -229,6 +229,23 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig { } } +func minimumGreedyTimerConfig() *iwfidl.WorkflowConfig { + return greedyTimerConfig(false) +} + +func greedyTimerConfig(continueAsNew bool) *iwfidl.WorkflowConfig { + if continueAsNew { + return &iwfidl.WorkflowConfig{ + ContinueAsNewThreshold: iwfidl.PtrInt32(1), + OptimizeTimer: iwfidl.PtrBool(true), + } + } + + return &iwfidl.WorkflowConfig{ + OptimizeTimer: iwfidl.PtrBool(true), + } +} + func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig { return minimumContinueAsNewConfig(false) } diff --git a/integ/workflow/greedy_timer/routers.go b/integ/workflow/greedy_timer/routers.go new file mode 100644 index 00000000..830217ce --- /dev/null +++ b/integ/workflow/greedy_timer/routers.go @@ -0,0 +1,154 @@ +package greedy_timer + +import ( + "encoding/json" + "fmt" + "github.com/gin-gonic/gin" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/helpers" + "github.com/indeedeng/iwf/integ/workflow/common" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" + "log" + "net/http" + "strconv" + "testing" +) + +/* +* +This workflow will accept an array of integers representing durations and execute a state that waits on a timer corresponding to each duration provided +*/ +const ( + WorkflowType = "greedy_timer" + ScheduleTimerState = "schedule" + SubmitDurationsRPC = "submitDurationsRPC" +) + +type handler struct { + invokeHistory map[string]int64 + invokeData map[string]interface{} +} + +type Input struct { + Durations []int64 `json:"durations"` +} + +func NewHandler() common.WorkflowHandlerWithRpc { + return &handler{ + invokeHistory: make(map[string]int64), + invokeData: make(map[string]interface{}), + } +} + +func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context, t *testing.T) { + var req iwfidl.WorkflowWorkerRpcRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received workflow worker rpc request, ", req) + + wfCtx := req.Context + if wfCtx.WorkflowId == "" || wfCtx.WorkflowRunId == "" { + helpers.FailTestWithErrorMessage("invalid context in the request", t) + } + if req.WorkflowType != WorkflowType { + panic("invalid WorkflowType:" + req.WorkflowType) + } + + if req.RpcName == SubmitDurationsRPC { + + c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{ + StateDecision: &iwfidl.StateDecision{NextStates: []iwfidl.StateMovement{ + { + StateId: ScheduleTimerState, + StateInput: req.Input, + }, + }}, + }) + return + } + + helpers.FailTestWithErrorMessage(fmt.Sprintf("invalid rpc name: %s", req.RpcName), t) +} + +// ApiV1WorkflowStartPost - for a workflow +func (h *handler) ApiV1WorkflowStateStart(c *gin.Context, t *testing.T) { + var req iwfidl.WorkflowStateStartRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state start request, ", req) + + if req.GetWorkflowType() == WorkflowType { + + h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if req.GetWorkflowStateId() == ScheduleTimerState { + + var input Input + err := json.Unmarshal([]byte(req.StateInput.GetData()), &input) + if err != nil { + panic(err) + } + + timers := make([]iwfidl.TimerCommand, len(input.Durations)) + for i, duration := range input.Durations { + timers[i] = iwfidl.TimerCommand{ + CommandId: ptr.Any("duration-" + strconv.FormatInt(duration, 10)), + DurationSeconds: iwfidl.PtrInt64(duration), + } + } + + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ + CommandRequest: &iwfidl.CommandRequest{ + TimerCommands: timers, + DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), + }, + }) + + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context, t *testing.T) { + var req iwfidl.WorkflowStateDecideRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state decide request, ", req) + + if req.GetWorkflowType() == WorkflowType { + + h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if req.GetWorkflowStateId() == ScheduleTimerState { + h.invokeData["completed_state_id"] = req.GetContext().StateExecutionId + results := req.GetCommandResults() + timerResults := results.GetTimerResults() + h.invokeData["completed_timer_id"] = timerResults[0].CommandId + + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: service.ForceCompletingWorkflowStateId, + }, + }, + }, + }) + + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { + return h.invokeHistory, h.invokeData +} diff --git a/service/api/service.go b/service/api/service.go index 94ca28b4..e786d152 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "net/http" "os" "strings" @@ -13,15 +14,13 @@ import ( uclient "github.com/indeedeng/iwf/service/client" "github.com/indeedeng/iwf/service/common/compatibility" - "github.com/indeedeng/iwf/service/common/rpc" - "github.com/indeedeng/iwf/service/common/utils" - "github.com/indeedeng/iwf/service/interpreter" - "github.com/indeedeng/iwf/service/common/errors" "github.com/indeedeng/iwf/service/common/log" "github.com/indeedeng/iwf/service/common/log/tag" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/common/rpc" + "github.com/indeedeng/iwf/service/common/utils" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" @@ -197,6 +196,9 @@ func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig if configOverride.OptimizeActivity != nil { workflowConfig.OptimizeActivity = configOverride.OptimizeActivity } + if configOverride.OptimizeTimer != nil { + workflowConfig.OptimizeTimer = configOverride.OptimizeTimer + } } func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( @@ -802,7 +804,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate( ctx context.Context, req iwfidl.WorkflowRpcRequest, ) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req)) - var output interpreter.HandlerOutput + var output interfaces.HandlerOutput err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req) if err != nil { errType := s.client.GetApplicationErrorTypeIfIsApplicationError(err) diff --git a/service/interfaces.go b/service/interfaces.go index 5d05f634..7fc15548 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -101,6 +101,10 @@ type ( StateExecutionCurrentTimerInfos map[string][]*TimerInfo // key is stateExecutionId } + GetScheduledGreedyTimerTimesQueryResponse struct { + PendingScheduled []*TimerInfo + } + TimerInfo struct { CommandId *string FiringUnixTimestampSeconds int64 @@ -133,8 +137,9 @@ type ( } DebugDumpResponse struct { - Config iwfidl.WorkflowConfig - Snapshot ContinueAsNewDumpResponse + Config iwfidl.WorkflowConfig + Snapshot ContinueAsNewDumpResponse + FiringTimersUnixTimestamps []int64 } StateExecutionCounterInfo struct { diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index dc6c3675..a91b76e0 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -12,6 +12,7 @@ import ( "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "io" "net/http" "os" @@ -30,7 +31,7 @@ func StateApiWaitUntil( ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, searchAttributes []iwfidl.SearchAttribute, ) (*iwfidl.WorkflowStateStartResponse, error) { stateApiWaitUntilStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateWaitUntilActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) @@ -120,7 +121,7 @@ func StateApiExecute( searchAttributes []iwfidl.SearchAttribute, ) (*iwfidl.WorkflowStateDecideResponse, error) { stateApiExecuteStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateExecuteActivity", "input", log.ToJsonAndTruncateForLogging(input)) @@ -205,20 +206,20 @@ func checkStateDecisionFromResponse(resp *iwfidl.WorkflowStateDecideResponse) er return nil } -func printDebugMsg(logger UnifiedLogger, err error, url string) { +func printDebugMsg(logger interfaces.UnifiedLogger, err error, url string) { debugMode := os.Getenv(service.EnvNameDebugMode) if debugMode != "" { logger.Info("check error at http request", err, url) } } -func composeStartApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { +func composeStartApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) } -func composeExecuteApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { +func composeExecuteApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) @@ -232,7 +233,7 @@ func checkHttpError(err error, httpResp *http.Response) bool { } func composeHttpError( - isLocalActivity bool, provider ActivityProvider, err error, httpResp *http.Response, errType string, + isLocalActivity bool, provider interfaces.ActivityProvider, err error, httpResp *http.Response, errType string, ) error { responseBody := "None" var statusCode int @@ -337,7 +338,7 @@ func listTimerSignalInternalChannelCommandIds(commandReq *iwfidl.CommandRequest) func DumpWorkflowInternal( ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest, ) (*iwfidl.WorkflowDumpResponse, error) { - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("DumpWorkflowInternalActivity", "input", log.ToJsonAndTruncateForLogging(req)) @@ -365,15 +366,15 @@ func DumpWorkflowInternal( func InvokeWorkerRpc( ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, -) (*InvokeRpcActivityOutput, error) { - provider := getActivityProviderByType(backendType) +) (*interfaces.InvokeRpcActivityOutput, error) { + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req)) apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds) - return &InvokeRpcActivityOutput{ + return &interfaces.InvokeRpcActivityOutput{ RpcOutput: resp, StatusError: statusErr, }, nil diff --git a/service/interpreter/activityImpl_test.go b/service/interpreter/activityImpl_test.go index 4a4e2885..428085ba 100644 --- a/service/interpreter/activityImpl_test.go +++ b/service/interpreter/activityImpl_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "github.com/stretchr/testify/assert" "io" "net/http" @@ -195,9 +196,9 @@ func TestComposeHttpError_RegularActivity_NilResponse(t *testing.T) { assert.Equal(t, returnedError, err) } -func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*MockActivityProvider, *http.Response, error) { +func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*interfaces.MockActivityProvider, *http.Response, error) { ctrl := gomock.NewController(t) - mockActivityProvider := NewMockActivityProvider(ctrl) + mockActivityProvider := interfaces.NewMockActivityProvider(ctrl) var httpResp *http.Response = nil if httpError != "" { diff --git a/service/interpreter/cadence/activityProvider.go b/service/interpreter/cadence/activityProvider.go index b8a4496c..6fb7fb3b 100644 --- a/service/interpreter/cadence/activityProvider.go +++ b/service/interpreter/cadence/activityProvider.go @@ -3,7 +3,7 @@ package cadence import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence" "go.uber.org/cadence/activity" ) @@ -11,27 +11,27 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) } func (a *activityProvider) NewApplicationError(errType string, details interface{}) error { return cadence.NewCustomError(errType, details) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { zLogger := activity.GetLogger(ctx) return &loggerImpl{ zlogger: zLogger, } } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTimestamp, Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal IsLocalActivity: false, // TODO cadence doesn't support this yet - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/cadence/workflow.go b/service/interpreter/cadence/workflow.go index 107d76e0..9c9512e1 100644 --- a/service/interpreter/cadence/workflow.go +++ b/service/interpreter/cadence/workflow.go @@ -3,13 +3,14 @@ package cadence import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence/workflow" ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) } diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 996f4e89..925b4f8d 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -3,12 +3,12 @@ package cadence import ( "fmt" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "go.uber.org/cadence" "go.uber.org/cadence/workflow" ) @@ -18,7 +18,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newCadenceWorkflowProvider() interpreter.WorkflowProvider { +func newCadenceWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -38,7 +38,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -48,7 +48,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -57,11 +57,11 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { return fmt.Errorf("upsert memo is not supported in Cadence") } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -72,14 +72,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -91,7 +91,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -103,7 +103,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -113,32 +113,32 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { // NOTE: this feature is not available in Cadence return nil } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -159,7 +159,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -168,8 +168,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -190,7 +190,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertCadenceActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -201,7 +201,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -212,7 +212,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -232,7 +232,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -240,7 +240,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -248,7 +248,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { return workflow.IsReplaying(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -257,7 +257,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -276,7 +276,7 @@ func (t *cadenceReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -286,8 +286,8 @@ func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -298,7 +298,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -306,7 +306,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -318,7 +318,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter } } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") diff --git a/service/interpreter/workflowConfiger.go b/service/interpreter/config/workflowConfiger.go similarity index 98% rename from service/interpreter/workflowConfiger.go rename to service/interpreter/config/workflowConfiger.go index 81fedc02..dfa53506 100644 --- a/service/interpreter/workflowConfiger.go +++ b/service/interpreter/config/workflowConfiger.go @@ -1,4 +1,4 @@ -package interpreter +package config import ( "github.com/indeedeng/iwf/gen/iwfidl" diff --git a/service/interpreter/continueAsNewCounter.go b/service/interpreter/cont/continueAsNewCounter.go similarity index 75% rename from service/interpreter/continueAsNewCounter.go rename to service/interpreter/cont/continueAsNewCounter.go index 279c7515..c98bb20d 100644 --- a/service/interpreter/continueAsNewCounter.go +++ b/service/interpreter/cont/continueAsNewCounter.go @@ -1,4 +1,9 @@ -package interpreter +package cont + +import ( + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/interfaces" +) type ContinueAsNewCounter struct { executedStateApis int32 @@ -6,13 +11,13 @@ type ContinueAsNewCounter struct { syncUpdateReceived int32 triggeredByAPI bool - configer *WorkflowConfiger - rootCtx UnifiedContext - provider WorkflowProvider + configer *config.WorkflowConfiger + rootCtx interfaces.UnifiedContext + provider interfaces.WorkflowProvider } func NewContinueAsCounter( - configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider, + configer *config.WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) *ContinueAsNewCounter { return &ContinueAsNewCounter{ configer: configer, diff --git a/service/interpreter/continueAsNewer.go b/service/interpreter/continueAsNewer.go index 4ec787cb..3264af6e 100644 --- a/service/interpreter/continueAsNewer.go +++ b/service/interpreter/continueAsNewer.go @@ -7,13 +7,14 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "math" "strings" "time" ) type ContinueAsNewer struct { - provider WorkflowProvider + provider interfaces.WorkflowProvider StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo inflightUpdateOperations int @@ -24,14 +25,14 @@ type ContinueAsNewer struct { persistenceManager *PersistenceManager signalReceiver *SignalReceiver outputCollector *OutputCollector - timerProcessor *TimerProcessor + timerProcessor interfaces.TimerProcessor } func NewContinueAsNewer( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, ) *ContinueAsNewer { return &ContinueAsNewer{ provider: provider, @@ -49,9 +50,9 @@ func NewContinueAsNewer( } func LoadInternalsFromPreviousRun( - ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, ) (*service.ContinueAsNewDumpResponse, error) { - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumIntervalSeconds: iwfidl.PtrInt32(5), @@ -134,7 +135,7 @@ func (c *ContinueAsNewer) GetSnapshot() service.ContinueAsNewDumpResponse { } } -func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error { +func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx interfaces.UnifiedContext) error { return c.provider.SetQueryHandler(ctx, service.ContinueAsNewDumpByPageQueryType, // return the current page of the whole snapshot func(request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error) { @@ -192,7 +193,7 @@ func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string) delete(c.StateExecutionToResumeMap, stateExecutionId) } -func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error { +func (c *ContinueAsNewer) DrainThreads(ctx interfaces.UnifiedContext) error { // TODO: add metric for before and after Await to monitor stuck // NOTE: consider using AwaitWithTimeout to get an alert when workflow stuck due to a bug in the draining logic for continueAsNew @@ -222,7 +223,7 @@ var inMemoryContinueAsNewMonitor = make(map[string]time.Time) const warnThreshold = time.Second * 5 const errThreshold = time.Second * 15 -func (c *ContinueAsNewer) allThreadsDrained(ctx UnifiedContext) bool { +func (c *ContinueAsNewer) allThreadsDrained(ctx interfaces.UnifiedContext) bool { runId := c.provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID remainingThreadCount := c.provider.GetThreadCount() diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 4c7ab5ca..f4d00d1e 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -2,6 +2,7 @@ package interpreter import ( "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) const globalChangeId = "global" @@ -35,13 +36,13 @@ const MaxOfAllVersions = StartingVersionYieldOnConditionalComplete // GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type GlobalVersioner struct { - workflowProvider WorkflowProvider - ctx UnifiedContext + workflowProvider interfaces.WorkflowProvider + ctx interfaces.UnifiedContext version int } func NewGlobalVersioner( - workflowProvider WorkflowProvider, ctx UnifiedContext, + workflowProvider interfaces.WorkflowProvider, ctx interfaces.UnifiedContext, ) (*GlobalVersioner, error) { version := workflowProvider.GetVersion(ctx, globalChangeId, 0, MaxOfAllVersions) diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces/interfaces.go similarity index 88% rename from service/interpreter/interfaces.go rename to service/interpreter/interfaces/interfaces.go index 7a6f733f..3fe0178e 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces/interfaces.go @@ -1,4 +1,4 @@ -package interpreter +package interfaces import ( "context" @@ -31,7 +31,7 @@ func RegisterActivityProvider(backendType service.BackendType, provider Activity activityProviderRegistry[backendType] = provider } -func getActivityProviderByType(backendType service.BackendType) ActivityProvider { +func GetActivityProviderByType(backendType service.BackendType) ActivityProvider { provider := activityProviderRegistry[backendType] if provider == nil { panic("not supported yet: " + backendType) @@ -84,6 +84,17 @@ func NewUnifiedContext(ctx interface{}) UnifiedContext { } } +type TimerProcessor interface { + Dump() []service.StaleSkipTimerSignal + SkipTimer(stateExeId string, timerId string, timerIdx int) bool + RetryStaleSkipTimer() bool + WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus + RemovePendingTimersOfState(stateExeId string) + AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus) + GetTimerInfos() map[string][]*service.TimerInfo + GetTimerStartedUnixTimestamps() []int64 +} + type WorkflowProvider interface { NewApplicationError(errType string, details interface{}) error IsApplicationError(err error) bool diff --git a/service/interpreter/interfaces_mock.go b/service/interpreter/interfaces/interfaces_mock.go similarity index 99% rename from service/interpreter/interfaces_mock.go rename to service/interpreter/interfaces/interfaces_mock.go index 3b2799d0..55058d08 100644 --- a/service/interpreter/interfaces_mock.go +++ b/service/interpreter/interfaces/interfaces_mock.go @@ -2,7 +2,7 @@ // Source: /Users/lwolczynski/indeedeng/iwf-server/service/interpreter/interfaces.go // Package interpreter is a generated GoMock package. -package interpreter +package interfaces import ( context "context" diff --git a/service/interpreter/persistence.go b/service/interpreter/persistence.go index c9c34a30..45e1d284 100644 --- a/service/interpreter/persistence.go +++ b/service/interpreter/persistence.go @@ -5,12 +5,13 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) type PersistenceManager struct { dataObjects map[string]iwfidl.KeyValue searchAttributes map[string]iwfidl.SearchAttribute - provider WorkflowProvider + provider interfaces.WorkflowProvider lockedDataObjectKeys map[string]bool lockedSearchAttributeKeys map[string]bool @@ -19,7 +20,7 @@ type PersistenceManager struct { } func NewPersistenceManager( - provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, + provider interfaces.WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { searchAttributes := make(map[string]iwfidl.SearchAttribute) @@ -43,7 +44,7 @@ func NewPersistenceManager( } func RebuildPersistenceManager( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { @@ -89,7 +90,7 @@ func (am *PersistenceManager) GetDataObjectsByKey(request service.GetDataAttribu } func (am *PersistenceManager) LoadSearchAttributes( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.SearchAttribute { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -127,7 +128,7 @@ func (am *PersistenceManager) LoadSearchAttributes( } func (am *PersistenceManager) LoadDataObjects( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.KeyValue { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -181,7 +182,7 @@ func (am *PersistenceManager) GetAllDataObjects() []iwfidl.KeyValue { } func (am *PersistenceManager) ProcessUpsertSearchAttribute( - ctx UnifiedContext, attributes []iwfidl.SearchAttribute, + ctx interfaces.UnifiedContext, attributes []iwfidl.SearchAttribute, ) error { if len(attributes) == 0 { return nil @@ -197,7 +198,7 @@ func (am *PersistenceManager) ProcessUpsertSearchAttribute( return am.provider.UpsertSearchAttributes(ctx, attrsToUpsert) } -func (am *PersistenceManager) ProcessUpsertDataObject(ctx UnifiedContext, attributes []iwfidl.KeyValue) error { +func (am *PersistenceManager) ProcessUpsertDataObject(ctx interfaces.UnifiedContext, attributes []iwfidl.KeyValue) error { if len(attributes) == 0 { return nil } @@ -228,7 +229,7 @@ func (am *PersistenceManager) checkKeysAreUnlocked(lockedKeys map[string]bool, k return true } -func (am *PersistenceManager) awaitAndLockForKeys(ctx UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { +func (am *PersistenceManager) awaitAndLockForKeys(ctx interfaces.UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { // wait until all keys are not locked err := am.provider.Await(ctx, func() bool { for _, k := range keysToLock { diff --git a/service/interpreter/queryHandler.go b/service/interpreter/queryHandler.go index 7b2d7660..91e8a397 100644 --- a/service/interpreter/queryHandler.go +++ b/service/interpreter/queryHandler.go @@ -3,13 +3,20 @@ package interpreter import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) func SetQueryHandlers( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, - internalChannel *InternalChannel, signalReceiver *SignalReceiver, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, + timerProcessor interfaces.TimerProcessor, + persistenceManager *PersistenceManager, + internalChannel *InternalChannel, + signalReceiver *SignalReceiver, continueAsNewer *ContinueAsNewer, - workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo, + workflowConfiger *config.WorkflowConfiger, + basicInfo service.BasicInfo, ) error { err := provider.SetQueryHandler(ctx, service.GetDataAttributesWorkflowQueryType, func(req service.GetDataAttributesQueryRequest) (service.GetDataAttributesQueryResponse, error) { dos := persistenceManager.GetDataObjectsByKey(req) @@ -30,8 +37,9 @@ func SetQueryHandlers( } err = provider.SetQueryHandler(ctx, service.DebugDumpQueryType, func() (*service.DebugDumpResponse, error) { return &service.DebugDumpResponse{ - Config: workflowConfiger.Get(), - Snapshot: continueAsNewer.GetSnapshot(), + Config: workflowConfiger.Get(), + Snapshot: continueAsNewer.GetSnapshot(), + FiringTimersUnixTimestamps: timerProcessor.GetTimerStartedUnixTimestamps(), }, nil }) if err != nil { @@ -54,5 +62,16 @@ func SetQueryHandlers( if err != nil { return err } + + err = provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { + return service.GetCurrentTimerInfosQueryResponse{ + StateExecutionCurrentTimerInfos: timerProcessor.GetTimerInfos(), + }, nil + }) + + if err != nil { + return err + } + return nil } diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 9dc5573d..8cc14aca 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -2,6 +2,9 @@ package interpreter import ( "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "strings" "github.com/indeedeng/iwf/gen/iwfidl" @@ -13,19 +16,19 @@ type SignalReceiver struct { receivedSignals map[string][]*iwfidl.EncodedObject failWorkflowByClient bool reasonFailWorkflowByClient *string - provider WorkflowProvider - timerProcessor *TimerProcessor - workflowConfiger *WorkflowConfiger + provider interfaces.WorkflowProvider + timerProcessor interfaces.TimerProcessor + workflowConfiger *config.WorkflowConfiger interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue persistenceManager *PersistenceManager } func NewSignalReceiver( - ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, stateRequestQueue *StateRequestQueue, - persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, - workflowConfiger *WorkflowConfiger, + persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *cont.ContinueAsNewCounter, + workflowConfiger *config.WorkflowConfiger, initReceivedSignals map[string][]*iwfidl.EncodedObject, ) *SignalReceiver { if initReceivedSignals == nil { @@ -46,7 +49,7 @@ func NewSignalReceiver( //received or a continueAsNew run is triggered. When a signal has been received it sets //SignalReceiver.failWorkflowByClient to true and sets SignalReceiver.reasonFailWorkflowByClient to the reason //given in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. - provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.FailWorkflowSignalChannelName) @@ -74,7 +77,7 @@ func NewSignalReceiver( //The thread waits until a SkipTimerSignalChannelName signal has been //received or a continueAsNew run is triggered. When a signal has been received it skips the specific timer //described in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. - provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.SkipTimerSignalChannelName) val := service.SkipTimerSignalRequest{} @@ -101,7 +104,7 @@ func NewSignalReceiver( //The thread waits until a UpdateConfigSignalChannelName signal has been //received or a continueAsNew run is triggered. When a signal has been received it updates the workflow config //defined in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. - provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.UpdateConfigSignalChannelName) val := iwfidl.WorkflowConfigUpdateRequest{} @@ -128,7 +131,7 @@ func NewSignalReceiver( //The thread waits until a TriggerContinueAsNewSignalChannelName signal has //been received or a continueAsNew run is triggered. When a signal has been received it triggers a continueAsNew run. //Since this thread is triggering a continueAsNew run it doesn't need to wait for signals to drain from the channel. - provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx interfaces.UnifiedContext) { // NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew // because if there is a continueAsNew, this signal is not needed anymore ch := provider.GetSignalChannel(ctx, service.TriggerContinueAsNewSignalChannelName) @@ -153,7 +156,7 @@ func NewSignalReceiver( //(if they exist in the signal value), upserts search attributes (if they exist in the signal value), //and/or publishes a message to an internal channel (if InterStateChannelPublishing is set in the signal value). //If continueIsNew is triggered, the thread completes after all signals have been processed. - provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName) var val service.ExecuteRpcSignalRequest @@ -185,7 +188,7 @@ func NewSignalReceiver( //The thread waits until a signal has been received that is not an IWF //system signal name or a continueAsNew run is triggered. When a signal has been received it processes the //external signal. If continueIsNew is triggered, the thread completes after all signals have been processed. - provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx interfaces.UnifiedContext) { for { var toProcess []string err := provider.Await(ctx, func() bool { @@ -221,7 +224,7 @@ func NewSignalReceiver( return sr } -func (sr *SignalReceiver) receiveSignal(ctx UnifiedContext, sigName string) { +func (sr *SignalReceiver) receiveSignal(ctx interfaces.UnifiedContext, sigName string) { ch := sr.provider.GetSignalChannel(ctx, sigName) for { var sigVal iwfidl.EncodedObject @@ -278,7 +281,7 @@ func (sr *SignalReceiver) GetInfos() map[string]iwfidl.ChannelInfo { // This includes both regular user signals and system signals // 2. Conditional close/complete workflow on signal/internal channel: // retrieve all signal/internal channel messages before checking the signal/internal channels -func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx UnifiedContext) { +func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx interfaces.UnifiedContext) { unhandledSigs := sr.provider.GetUnhandledSignalNames(ctx) if len(unhandledSigs) == 0 { return diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 1b21ea11..58d3d888 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -6,16 +6,19 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "reflect" "slices" ) type StateExecutionCounter struct { - ctx UnifiedContext - provider WorkflowProvider - configer *WorkflowConfiger + ctx interfaces.UnifiedContext + provider interfaces.WorkflowProvider + configer *config.WorkflowConfiger globalVersioner *GlobalVersioner - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter stateIdCompletedCounts map[string]int stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed @@ -24,8 +27,8 @@ type StateExecutionCounter struct { } func NewStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, @@ -40,10 +43,10 @@ func NewStateExecutionCounter( } func RebuildStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, diff --git a/service/interpreter/temporal/activityProvider.go b/service/interpreter/temporal/activityProvider.go index d8169791..5941427f 100644 --- a/service/interpreter/temporal/activityProvider.go +++ b/service/interpreter/temporal/activityProvider.go @@ -3,7 +3,7 @@ package temporal import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/temporal" ) @@ -11,10 +11,10 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { return activity.GetLogger(ctx) } @@ -22,13 +22,13 @@ func (a *activityProvider) NewApplicationError(errType string, details interface return temporal.NewApplicationError("", errType, details) } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTime, Attempt: info.Attempt, IsLocalActivity: info.IsLocalActivity, - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/temporal/workflow.go b/service/interpreter/temporal/workflow.go index aa1e792c..59bec2ec 100644 --- a/service/interpreter/temporal/workflow.go +++ b/service/interpreter/temporal/workflow.go @@ -3,6 +3,7 @@ package temporal import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/workflow" // TODO(cretz): Remove when tagged @@ -10,9 +11,9 @@ import ( ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index a4aeb57a..0a730606 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -3,12 +3,12 @@ package temporal import ( "errors" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "github.com/indeedeng/iwf/service/interpreter/env" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -19,7 +19,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newTemporalWorkflowProvider() interpreter.WorkflowProvider { +func newTemporalWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -39,7 +39,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -49,7 +49,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -58,7 +58,7 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -83,7 +83,7 @@ func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo ma return workflow.UpsertMemo(wfCtx, memo) } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -94,14 +94,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -113,7 +113,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -125,7 +125,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -135,19 +135,19 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } v2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) error { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) return validator(ctx2, input) } - h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interpreter.HandlerOutput, error) { - ctx2 := interpreter.NewUnifiedContext(ctx) + h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interfaces.HandlerOutput, error) { + ctx2 := interfaces.NewUnifiedContext(ctx) return handler(ctx2, input) } return workflow.SetUpdateHandlerWithOptions( @@ -159,24 +159,24 @@ func (w *workflowProvider) SetRpcUpdateHandler( } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -197,7 +197,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -206,8 +206,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -231,7 +231,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertTemporalActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -242,7 +242,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -253,7 +253,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -272,7 +272,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -280,7 +280,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -288,7 +288,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -297,7 +297,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -316,7 +316,7 @@ func (t *temporalReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *temporalReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -326,8 +326,8 @@ func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -338,7 +338,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -346,7 +346,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -354,7 +354,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter return workflow.GetLogger(wfCtx) } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go new file mode 100644 index 00000000..1ccc1f28 --- /dev/null +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -0,0 +1,166 @@ +package timers + +import ( + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" +) + +type GreedyTimerProcessor struct { + timerManager *timerScheduler + stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo + staleSkipTimerSignals []service.StaleSkipTimerSignal + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger +} + +func NewGreedyTimerProcessor( + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, + continueAsNewCounter *cont.ContinueAsNewCounter, + staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *GreedyTimerProcessor { + + // start some single thread that manages pendingScheduling + scheduler := startGreedyTimerScheduler(ctx, provider, continueAsNewCounter) + + tp := &GreedyTimerProcessor{ + provider: provider, + timerManager: scheduler, + stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, + logger: provider.GetLogger(ctx), + staleSkipTimerSignals: staleSkipTimerSignals, + } + + return tp +} + +func (t *GreedyTimerProcessor) Dump() []service.StaleSkipTimerSignal { + return t.staleSkipTimerSignals +} + +func (t *GreedyTimerProcessor) GetTimerInfos() map[string][]*service.TimerInfo { + return t.stateExecutionCurrentTimerInfos +} + +func (t *GreedyTimerProcessor) GetTimerStartedUnixTimestamps() []int64 { + return t.timerManager.providerScheduledTimerUnixTs +} + +// SkipTimer will attempt to skip a timer, return false if no valid timer found +func (t *GreedyTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { + timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) + if !valid { + // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition + t.logger.Warn("cannot process timer skip request, maybe state is already closed...putting into a stale skip timer queue", stateExeId, timerId, timerIdx) + + t.staleSkipTimerSignals = append(t.staleSkipTimerSignals, service.StaleSkipTimerSignal{ + StateExecutionId: stateExeId, + TimerCommandId: timerId, + TimerCommandIndex: timerIdx, + }) + return false + } + timer.Status = service.TimerSkipped + return true +} + +func (t *GreedyTimerProcessor) RetryStaleSkipTimer() bool { + for i, staleSkip := range t.staleSkipTimerSignals { + found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) + if found { + newList := removeElement(t.staleSkipTimerSignals, i) + t.staleSkipTimerSignals = newList + return true + } + } + return false +} + +// WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), +// return true when the timer is fired or skipped +// return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) +func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +) service.InternalTimerStatus { + timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] + if len(timerInfos) == 0 { + if *cancelWaiting { + // The waiting thread is later than the timer execState thread + // The execState thread got completed early and call RemovePendingTimersOfState to remove the timerInfos + // returning pending here + return service.TimerPending + } else { + panic("bug: this shouldn't happen") + } + } + timer := timerInfos[timerIdx] + if timer.Status == service.TimerFired || timer.Status == service.TimerSkipped { + return timer.Status + } + skippedByStaleSkip := t.RetryStaleSkipTimer() + if skippedByStaleSkip { + t.logger.Warn("timer skipped by stale skip signal", stateExeId, timerIdx) + timer.Status = service.TimerSkipped + return service.TimerSkipped + } + + _ = t.provider.Await(ctx, func() bool { + // This is trigger when one of the timers scheduled by the timerScheduler fires, scheduling a + // new workflow task that evaluates the workflow's goroutines + return timer.Status == service.TimerFired || timer.Status == service.TimerSkipped || timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() || *cancelWaiting + }) + + if timer.Status == service.TimerSkipped { + return service.TimerSkipped + } + + if timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() { + timer.Status = service.TimerFired + return service.TimerFired + } + + // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) + t.timerManager.removeTimer(timer) + return service.TimerPending +} + +// RemovePendingTimersOfState is for when a state is completed, remove all its pending pendingScheduling +func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { + + timers := t.stateExecutionCurrentTimerInfos[stateExeId] + + for _, timer := range timers { + t.timerManager.removeTimer(timer) + } + + delete(t.stateExecutionCurrentTimerInfos, stateExeId) +} + +func (t *GreedyTimerProcessor) AddTimers( + stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, +) { + timers := make([]*service.TimerInfo, len(commands)) + for idx, cmd := range commands { + var timer service.TimerInfo + if status, ok := completedTimerCmds[idx]; ok { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: status, + } + } else { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: service.TimerPending, + } + } + if timer.Status == service.TimerPending { + t.timerManager.addTimer(&timer) + } + timers[idx] = &timer + } + t.stateExecutionCurrentTimerInfos[stateExeId] = timers +} diff --git a/service/interpreter/timers/greedyTimerScheduler.go b/service/interpreter/timers/greedyTimerScheduler.go new file mode 100644 index 00000000..9df6faaf --- /dev/null +++ b/service/interpreter/timers/greedyTimerScheduler.go @@ -0,0 +1,128 @@ +package timers + +import ( + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "time" +) + +type timerScheduler struct { + // Timers requested by the workflow in desc order + pendingScheduling []*service.TimerInfo + // timers created through the workflow provider that are going to fire in desc order + providerScheduledTimerUnixTs []int64 +} + +func (t *timerScheduler) addTimer(toAdd *service.TimerInfo) { + + if toAdd == nil || toAdd.Status != service.TimerPending { + panic("invalid timer added") + } + + insertIndex := 0 + for i, timer := range t.pendingScheduling { + if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { + // don't want dupes. Makes remove simpler + if toAdd == timer { + return + } + insertIndex = i + break + } + insertIndex = i + 1 + } + + front := t.pendingScheduling[:insertIndex] + var back []*service.TimerInfo + if insertIndex >= len(t.pendingScheduling) { + back = []*service.TimerInfo{toAdd} + } else { + back = append([]*service.TimerInfo{toAdd}, t.pendingScheduling[insertIndex:]...) + } + t.pendingScheduling = append(front, back...) +} + +func (t *timerScheduler) removeTimer(toRemove *service.TimerInfo) { + for i, timer := range t.pendingScheduling { + if toRemove == timer { + t.pendingScheduling = append(t.pendingScheduling[:i], t.pendingScheduling[i+1:]...) + return + } + } +} + +func (t *timerScheduler) pruneToNextTimer(pruneTo int64) *service.TimerInfo { + index := len(t.providerScheduledTimerUnixTs) + for i := len(t.providerScheduledTimerUnixTs) - 1; i >= 0; i-- { + timerTime := t.providerScheduledTimerUnixTs[i] + if timerTime > pruneTo { + break + } + index = i + } + // If index is 0, it means all times are in the past + if index == 0 { + t.providerScheduledTimerUnixTs = nil + } else { + t.providerScheduledTimerUnixTs = t.providerScheduledTimerUnixTs[:index] + } + + if len(t.pendingScheduling) == 0 { + return nil + } + + index = len(t.pendingScheduling) + + for i := len(t.pendingScheduling) - 1; i >= 0; i-- { + timer := t.pendingScheduling[i] + if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == service.TimerPending { + break + } + index = i + } + + // If index is 0, it means all timers are pruned + if index == 0 { + t.pendingScheduling = nil + return nil + } + + prunedTimer := t.pendingScheduling[index-1] + t.pendingScheduling = t.pendingScheduling[:index] + return prunedTimer +} + +func startGreedyTimerScheduler( + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, + continueAsNewCounter *cont.ContinueAsNewCounter) *timerScheduler { + + t := timerScheduler{} + provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { + for { + _ = provider.Await(ctx, func() bool { + now := provider.Now(ctx).Unix() + next := t.pruneToNextTimer(now) + return (next != nil && (len(t.providerScheduledTimerUnixTs) == 0 || next.FiringUnixTimestampSeconds < t.providerScheduledTimerUnixTs[len(t.providerScheduledTimerUnixTs)-1])) || continueAsNewCounter.IsThresholdMet() + }) + + if continueAsNewCounter.IsThresholdMet() { + break + } + + now := provider.Now(ctx).Unix() + next := t.pruneToNextTimer(now) + fireAt := next.FiringUnixTimestampSeconds + duration := time.Duration(fireAt-now) * time.Second + // This will create a new timer but not yield the goroutines awaiting the timer firing. + // This works since when a timer fires, a new workflow task is created with the expectation that + // there is a goroutines awaiting some condition(some time has past) to continue, + // see WaitForTimerFiredOrSkipped. + provider.NewTimer(ctx, duration) + t.providerScheduledTimerUnixTs = append(t.providerScheduledTimerUnixTs, fireAt) + } + }) + + return &t +} diff --git a/service/interpreter/timerProcessor.go b/service/interpreter/timers/simpleTimerProcessor.go similarity index 66% rename from service/interpreter/timerProcessor.go rename to service/interpreter/timers/simpleTimerProcessor.go index ebd7fcf3..0a81f55a 100644 --- a/service/interpreter/timerProcessor.go +++ b/service/interpreter/timers/simpleTimerProcessor.go @@ -1,50 +1,48 @@ -package interpreter +package timers import ( + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" ) -type TimerProcessor struct { +type SimpleTimerProcessor struct { stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo + awaitingTimers []int64 staleSkipTimerSignals []service.StaleSkipTimerSignal - provider WorkflowProvider - logger UnifiedLogger + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger } -func NewTimerProcessor( - ctx UnifiedContext, provider WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, -) *TimerProcessor { - tp := &TimerProcessor{ +func NewSimpleTimerProcessor( + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *SimpleTimerProcessor { + tp := &SimpleTimerProcessor{ provider: provider, stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, logger: provider.GetLogger(ctx), staleSkipTimerSignals: staleSkipTimerSignals, } - err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { - return service.GetCurrentTimerInfosQueryResponse{ - StateExecutionCurrentTimerInfos: tp.stateExecutionCurrentTimerInfos, - }, nil - }) - if err != nil { - panic("cannot set query handler") - } return tp } -func (t *TimerProcessor) Dump() []service.StaleSkipTimerSignal { +func (t *SimpleTimerProcessor) Dump() []service.StaleSkipTimerSignal { return t.staleSkipTimerSignals } -func (t *TimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { +func (t *SimpleTimerProcessor) GetTimerInfos() map[string][]*service.TimerInfo { return t.stateExecutionCurrentTimerInfos } +func (t *SimpleTimerProcessor) GetTimerStartedUnixTimestamps() []int64 { + return t.awaitingTimers +} + // SkipTimer will attempt to skip a timer, return false if no valid timer found -func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { +func (t *SimpleTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) if !valid { // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition @@ -61,7 +59,7 @@ func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) boo return true } -func (t *TimerProcessor) RetryStaleSkipTimer() bool { +func (t *SimpleTimerProcessor) RetryStaleSkipTimer() bool { for i, staleSkip := range t.staleSkipTimerSignals { found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) if found { @@ -73,16 +71,11 @@ func (t *TimerProcessor) RetryStaleSkipTimer() bool { return false } -func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { - s[i] = s[len(s)-1] - return s[:len(s)-1] -} - // WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), // return true when the timer is fired or skipped // return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) -func (t *TimerProcessor) WaitForTimerFiredOrSkipped( - ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +func (t *SimpleTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, ) service.InternalTimerStatus { timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] if len(timerInfos) == 0 { @@ -108,9 +101,11 @@ func (t *TimerProcessor) WaitForTimerFiredOrSkipped( fireAt := timer.FiringUnixTimestampSeconds duration := time.Duration(fireAt-now) * time.Second future := t.provider.NewTimer(ctx, duration) + t.awaitingTimers = append(t.awaitingTimers, fireAt) _ = t.provider.Await(ctx, func() bool { return future.IsReady() || timer.Status == service.TimerSkipped || *cancelWaiting }) + t.awaitingTimers = removeSingleTime(t.awaitingTimers, fireAt) if timer.Status == service.TimerSkipped { return service.TimerSkipped } @@ -121,12 +116,21 @@ func (t *TimerProcessor) WaitForTimerFiredOrSkipped( return service.TimerPending } +func removeSingleTime(timers []int64, at int64) []int64 { + for i, t := range timers { + if t == at { + return append(timers[:i], timers[i+1:]...) + } + } + return timers +} + // RemovePendingTimersOfState is for when a state is completed, remove all its pending timers -func (t *TimerProcessor) RemovePendingTimersOfState(stateExeId string) { +func (t *SimpleTimerProcessor) RemovePendingTimersOfState(stateExeId string) { delete(t.stateExecutionCurrentTimerInfos, stateExeId) } -func (t *TimerProcessor) AddTimers( +func (t *SimpleTimerProcessor) AddTimers( stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, ) { timers := make([]*service.TimerInfo, len(commands)) @@ -150,21 +154,3 @@ func (t *TimerProcessor) AddTimers( } t.stateExecutionCurrentTimerInfos[stateExeId] = timers } - -// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds -// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew -func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { - var timerCommands []iwfidl.TimerCommand - for _, cmd := range request.GetTimerCommands() { - if cmd.HasDurationSeconds() { - timerCommands = append(timerCommands, iwfidl.TimerCommand{ - CommandId: cmd.CommandId, - FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), - }) - } else { - timerCommands = append(timerCommands, cmd) - } - } - request.TimerCommands = timerCommands - return request -} diff --git a/service/interpreter/timers/utils.go b/service/interpreter/timers/utils.go new file mode 100644 index 00000000..9e39a9b9 --- /dev/null +++ b/service/interpreter/timers/utils.go @@ -0,0 +1,31 @@ +package timers + +import ( + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { + s[i] = s[len(s)-1] + return s[:len(s)-1] +} + +// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds +// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew +func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { + var timerCommands []iwfidl.TimerCommand + for _, cmd := range request.GetTimerCommands() { + if cmd.HasDurationSeconds() { + timerCommands = append(timerCommands, iwfidl.TimerCommand{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), + }) + } else { + timerCommands = append(timerCommands, cmd) + } + } + request.TimerCommands = timerCommands + return request +} diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index c747b9f8..033c3ba3 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -7,7 +7,11 @@ import ( "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "github.com/indeedeng/iwf/service/interpreter/timers" "time" "github.com/indeedeng/iwf/service/common/compatibility" @@ -18,7 +22,7 @@ import ( ) func InterpreterImpl( - ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, input service.InterpreterWorkflowInput, ) (output *service.InterpreterWorkflowOutput, retErr error) { var persistenceManager *PersistenceManager @@ -78,7 +82,7 @@ func InterpreterImpl( } } - workflowConfiger := NewWorkflowConfiger(input.Config) + workflowConfiger := config.NewWorkflowConfiger(input.Config) basicInfo := service.BasicInfo{ IwfWorkflowType: input.IwfWorkflowType, IwfWorkerUrl: input.IwfWorkerUrl, @@ -86,8 +90,8 @@ func InterpreterImpl( var internalChannel *InternalChannel var stateRequestQueue *StateRequestQueue - var timerProcessor *TimerProcessor - var continueAsNewCounter *ContinueAsNewCounter + var timerProcessor interfaces.TimerProcessor + var continueAsNewCounter *cont.ContinueAsNewCounter var signalReceiver *SignalReceiver var stateExecutionCounter *StateExecutionCounter var outputCollector *OutputCollector @@ -106,8 +110,12 @@ func InterpreterImpl( internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, previous.StaleSkipTimerSignals) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + } signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, @@ -119,8 +127,12 @@ func InterpreterImpl( internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, nil) - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, nil) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) + } signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) @@ -136,7 +148,7 @@ func InterpreterImpl( // This is to ensure the correctness. If we set the query handler before that, // the query handler could return empty data (since the loading hasn't completed), which will be incorrect response. // We would rather return server errors and let the client retry later. - err = SetQueryHandlers(ctx, provider, persistenceManager, internalChannel, signalReceiver, continueAsNewer, workflowConfiger, basicInfo) + err = SetQueryHandlers(ctx, provider, timerProcessor, persistenceManager, internalChannel, signalReceiver, continueAsNewer, workflowConfiger, basicInfo) if err != nil { retErr = err return @@ -211,7 +223,7 @@ func InterpreterImpl( // execute in another thread for parallelism // state must be passed via parameter https://stackoverflow.com/questions/67263092 stateCtx := provider.ExtendContextWithValue(ctx, "stateReq", stateReqForLoopingOnly) - provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx UnifiedContext) { + provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx interfaces.UnifiedContext) { stateReq, ok := provider.GetContextValue(ctx, "stateReq").(StateRequest) if !ok { errToFailWf = provider.NewApplicationError( @@ -388,7 +400,7 @@ func InterpreterImpl( } func checkClosingWorkflow( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, currentStateId, currentStateExeId string, internalChannel *InternalChannel, signalReceiver *SignalReceiver, ) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) { @@ -495,7 +507,7 @@ func checkClosingWorkflow( } func DrainReceivedButUnprocessedInternalChannelsFromStateApis( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, ) error { if versioner.IsAfterVersionOfYieldOnConditionalComplete() { // Just yield, by waiting on an empty lambda, nothing else. @@ -512,8 +524,8 @@ func DrainReceivedButUnprocessedInternalChannelsFromStateApis( } func processStateExecution( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, basicInfo service.BasicInfo, stateReq StateRequest, @@ -521,10 +533,10 @@ func processStateExecution( persistenceManager *PersistenceManager, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, continueAsNewer *ContinueAsNewer, - continueAsNewCounter *ContinueAsNewCounter, - configer *WorkflowConfiger, + continueAsNewCounter *cont.ContinueAsNewCounter, + configer *config.WorkflowConfiger, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { waitUntilApi := StateStart @@ -541,7 +553,7 @@ func processStateExecution( WorkflowStartedTimestamp: info.WorkflowStartTime.Unix(), StateExecutionId: &stateExeId, } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } @@ -650,7 +662,7 @@ func processStateExecution( } interStateChannel.ProcessPublishing(startResponse.GetPublishToInterStateChannel()) - commandReq = FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) + commandReq = timers.FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) stateExecutionLocal = startResponse.GetUpsertStateLocals() } @@ -663,7 +675,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "idx", idx) //Start timer in a new thread - provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { idx, ok := provider.GetContextValue(ctx, "idx").(int) if !ok { panic("critical code bug") @@ -689,7 +701,7 @@ func processStateExecution( cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) //Process signal in new thread - provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.SignalCommand) if !ok { panic("critical code bug") @@ -722,7 +734,7 @@ func processStateExecution( cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) //Process interstate channel command in a new thread. - provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.InterStateChannelCommand) if !ok { panic("critical code bug") @@ -827,8 +839,8 @@ func processStateExecution( } func invokeStateExecute( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, basicInfo service.BasicInfo, state iwfidl.StateMovement, stateExeId string, @@ -837,13 +849,13 @@ func invokeStateExecute( executionContext iwfidl.Context, commandRes *iwfidl.CommandResults, continueAsNewer *ContinueAsNewer, - configer *WorkflowConfiger, + configer *config.WorkflowConfiger, executeApi interface{}, stateExecutionLocal []iwfidl.KeyValue, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { var err error - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } if state.StateOptions != nil { @@ -1001,7 +1013,7 @@ func shouldProceedOnExecuteApiError(state iwfidl.StateMovement) bool { options.GetExecuteApiFailurePolicy() == iwfidl.PROCEED_TO_CONFIGURED_STATE } -func convertStateApiActivityError(provider WorkflowProvider, err error) error { +func convertStateApiActivityError(provider interfaces.WorkflowProvider, err error) error { if provider.IsApplicationError(err) { return err } @@ -1012,7 +1024,7 @@ func getCommandThreadName(prefix string, stateExecId, cmdId string, idx int) str return fmt.Sprintf("%v-%v-%v-%v", prefix, stateExecId, cmdId, idx) } -func createUserWorkflowError(provider WorkflowProvider, message string) error { +func createUserWorkflowError(provider interfaces.WorkflowProvider, message string) error { return provider.NewApplicationError( string(iwfidl.INVALID_USER_WORKFLOW_CODE_ERROR_TYPE), message, @@ -1020,7 +1032,7 @@ func createUserWorkflowError(provider WorkflowProvider, message string) error { } func WaitForStateCompletionWorkflowImpl( - ctx UnifiedContext, provider WorkflowProvider, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) (*service.WaitForStateCompletionWorkflowOutput, error) { signalReceiveChannel := provider.GetSignalChannel(ctx, service.StateCompletionSignalChannelName) var signalValue iwfidl.StateCompletionOutput diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 8577dd40..02e8dfa1 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -4,27 +4,30 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" ) type WorkflowUpdater struct { persistenceManager *PersistenceManager - provider WorkflowProvider + provider interfaces.WorkflowProvider continueAsNewer *ContinueAsNewer - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter internalChannel *InternalChannel signalReceiver *SignalReceiver stateRequestQueue *StateRequestQueue - configer *WorkflowConfiger - logger UnifiedLogger + configer *config.WorkflowConfiger + logger interfaces.UnifiedLogger basicInfo service.BasicInfo globalVersioner *GlobalVersioner } func NewWorkflowUpdater( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, - continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, + continueAsNewer *ContinueAsNewer, continueAsNewCounter *cont.ContinueAsNewCounter, configer *config.WorkflowConfiger, internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, ) (*WorkflowUpdater, error) { @@ -51,8 +54,8 @@ func NewWorkflowUpdater( } func (u *WorkflowUpdater) handler( - ctx UnifiedContext, input iwfidl.WorkflowRpcRequest, -) (output *HandlerOutput, err error) { + ctx interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest, +) (output *interfaces.HandlerOutput, err error) { u.continueAsNewer.IncreaseInflightOperation() defer u.continueAsNewer.DecreaseInflightOperation() @@ -81,7 +84,7 @@ func (u *WorkflowUpdater) handler( InternalChannelInfo: u.internalChannel.GetInfos(), } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumAttemptsDurationSeconds: input.TimeoutSeconds, @@ -89,7 +92,7 @@ func (u *WorkflowUpdater) handler( }, } ctx = u.provider.WithActivityOptions(ctx, activityOptions) - var activityOutput InvokeRpcActivityOutput + var activityOutput interfaces.InvokeRpcActivityOutput err = u.provider.ExecuteActivity(&activityOutput, u.configer.ShouldOptimizeActivity(), ctx, InvokeWorkerRpc, u.provider.GetBackendType(), rpcPrep, input) u.persistenceManager.UnlockPersistence(input.SearchAttributesLoadingPolicy, input.DataAttributesLoadingPolicy) @@ -98,7 +101,7 @@ func (u *WorkflowUpdater) handler( return nil, u.provider.NewApplicationError(string(iwfidl.SERVER_INTERNAL_ERROR_TYPE), "activity invocation failure:"+err.Error()) } - handlerOutput := &HandlerOutput{ + handlerOutput := &interfaces.HandlerOutput{ StatusError: activityOutput.StatusError, } @@ -119,7 +122,7 @@ func (u *WorkflowUpdater) handler( return handlerOutput, nil } -func (u *WorkflowUpdater) validator(_ UnifiedContext, input iwfidl.WorkflowRpcRequest) error { +func (u *WorkflowUpdater) validator(_ interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest) error { var daKeys, saKeys []string if input.HasDataAttributesLoadingPolicy() { daKeys = input.DataAttributesLoadingPolicy.LockingKeys