diff --git a/service/api/service.go b/service/api/service.go index e786d152..c44dd7e4 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -633,12 +633,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost( return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } + stateApiExecuteStartTime := time.Now().UnixMilli() + defer func() { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.RPC_EXECUTION_EVENT, - RpcName: &req.RpcName, - WorkflowType: rpcPrep.IwfWorkflowType, - WorkflowId: req.GetWorkflowId(), + EventType: iwfidl.RPC_EXECUTION_EVENT, + RpcName: &req.RpcName, + WorkflowType: rpcPrep.IwfWorkflowType, + WorkflowId: req.GetWorkflowId(), + StartTimestampInMs: ptr.Any(stateApiExecuteStartTime), // search attributes are not available at this time }) }() diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index a91b76e0..fc0afbec 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -57,13 +57,14 @@ func StateApiWaitUntil( printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, - WorkflowType: input.Request.WorkflowType, - WorkflowId: activityInfo.WorkflowExecution.ID, - WorkflowRunId: activityInfo.WorkflowExecution.RunID, - StateId: ptr.Any(input.Request.WorkflowStateId), - StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()), - SearchAttributes: searchAttributes, + EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()), + StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime), + SearchAttributes: searchAttributes, }) return nil, composeHttpError( activityInfo.IsLocalActivity, @@ -72,13 +73,14 @@ func StateApiWaitUntil( if err := checkCommandRequestFromWaitUntilResponse(resp); err != nil { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, - WorkflowType: input.Request.WorkflowType, - WorkflowId: activityInfo.WorkflowExecution.ID, - WorkflowRunId: activityInfo.WorkflowExecution.RunID, - StateId: ptr.Any(input.Request.WorkflowStateId), - StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()), - SearchAttributes: searchAttributes, + EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()), + StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime), + SearchAttributes: searchAttributes, }) return nil, composeStartApiRespError(provider, err, resp) } @@ -147,13 +149,14 @@ func StateApiExecute( printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, - WorkflowType: input.Request.WorkflowType, - WorkflowId: activityInfo.WorkflowExecution.ID, - WorkflowRunId: activityInfo.WorkflowExecution.RunID, - StateId: ptr.Any(input.Request.WorkflowStateId), - StateExecutionId: input.Request.Context.StateExecutionId, - SearchAttributes: searchAttributes, + EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: input.Request.Context.StateExecutionId, + StartTimestampInMs: ptr.Any(stateApiExecuteStartTime), + SearchAttributes: searchAttributes, }) return nil, composeHttpError( activityInfo.IsLocalActivity, @@ -162,13 +165,14 @@ func StateApiExecute( if err = checkStateDecisionFromResponse(resp); err != nil { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, - WorkflowType: input.Request.WorkflowType, - WorkflowId: activityInfo.WorkflowExecution.ID, - WorkflowRunId: activityInfo.WorkflowExecution.RunID, - StateId: ptr.Any(input.Request.WorkflowStateId), - StateExecutionId: input.Request.Context.StateExecutionId, - SearchAttributes: searchAttributes, + EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT, + WorkflowType: input.Request.WorkflowType, + WorkflowId: activityInfo.WorkflowExecution.ID, + WorkflowRunId: activityInfo.WorkflowExecution.RunID, + StateId: ptr.Any(input.Request.WorkflowStateId), + StateExecutionId: input.Request.Context.StateExecutionId, + StartTimestampInMs: ptr.Any(stateApiExecuteStartTime), + SearchAttributes: searchAttributes, }) return nil, composeExecuteApiRespError(provider, err, resp) } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 033c3ba3..54c668bf 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -45,11 +45,12 @@ func InterpreterImpl( }) } else if provider.IsApplicationError(retErr) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.WORKFLOW_FAIL_EVENT, - WorkflowType: input.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - SearchAttributes: sas, + EventType: iwfidl.WORKFLOW_FAIL_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + SearchAttributes: sas, + StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()), }) } } @@ -165,11 +166,12 @@ func InterpreterImpl( if !input.IsResumeFromContinueAsNew { if !provider.IsReplaying(ctx) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.WORKFLOW_START_EVENT, - WorkflowType: input.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - SearchAttributes: persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.WORKFLOW_START_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + SearchAttributes: persistenceManager.GetAllSearchAttributes(), + StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()), }) } // it's possible that a workflow is started without any starting state @@ -596,18 +598,19 @@ func processStateExecution( saLoadingPolicy := compatibility.GetWaitUntilApiSearchAttributesLoadingPolicy(state.StateOptions) doLoadingPolicy := compatibility.GetWaitUntilApiDataObjectsLoadingPolicy(state.StateOptions) + stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli() if !provider.IsReplaying(ctx) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - StateId: ptr.Any(state.StateId), - StateExecutionId: ptr.Any(stateExeId), - SearchAttributes: persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime), + SearchAttributes: persistenceManager.GetAllSearchAttributes(), }) } - stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli() errStartApi = provider.ExecuteActivity(&startResponse, configer.ShouldOptimizeActivity(), ctx, waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{ IwfWorkerUrl: basicInfo.IwfWorkerUrl, @@ -636,13 +639,14 @@ func processStateExecution( }) } else { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - StateId: ptr.Any(state.StateId), - StateExecutionId: ptr.Any(stateExeId), - SearchAttributes: persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime), + SearchAttributes: persistenceManager.GetAllSearchAttributes(), }) } } @@ -872,18 +876,19 @@ func invokeStateExecute( ctx = provider.WithActivityOptions(ctx, activityOptions) var decideResponse *iwfidl.WorkflowStateDecideResponse + stateExecuteApiStartTime := provider.Now(ctx).UnixMilli() if !provider.IsReplaying(ctx) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - StateId: ptr.Any(state.StateId), - StateExecutionId: ptr.Any(stateExeId), - SearchAttributes: persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StateExecutionId: ptr.Any(stateExeId), + StartTimestampInMs: ptr.Any(stateExecuteApiStartTime), + SearchAttributes: persistenceManager.GetAllSearchAttributes(), }) } - stateExecuteApiStartTime := provider.Now(ctx).UnixMilli() err = provider.ExecuteActivity(&decideResponse, configer.ShouldOptimizeActivity(), ctx, executeApi, provider.GetBackendType(), service.StateDecideActivityInput{ IwfWorkerUrl: basicInfo.IwfWorkerUrl, @@ -913,13 +918,14 @@ func invokeStateExecute( }) } else { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - StateId: ptr.Any(state.StateId), - StateExecutionId: ptr.Any(stateExeId), - SearchAttributes: persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StateId: ptr.Any(state.StateId), + StartTimestampInMs: ptr.Any(stateExecuteApiStartTime), + StateExecutionId: ptr.Any(stateExeId), + SearchAttributes: persistenceManager.GetAllSearchAttributes(), }) } } diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 02e8dfa1..62c7d7a2 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -4,6 +4,7 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/interpreter/config" "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/interfaces" @@ -61,14 +62,17 @@ func (u *WorkflowUpdater) handler( info := u.provider.GetWorkflowInfo(ctx) + rpcExecutionStartTime := u.provider.Now(ctx).UnixMilli() + defer func() { if !u.provider.IsReplaying(ctx) { event.Handle(iwfidl.IwfEvent{ - EventType: iwfidl.RPC_EXECUTION_EVENT, - RpcName: &input.RpcName, - WorkflowType: u.basicInfo.IwfWorkflowType, - WorkflowId: info.WorkflowExecution.ID, - SearchAttributes: u.persistenceManager.GetAllSearchAttributes(), + EventType: iwfidl.RPC_EXECUTION_EVENT, + RpcName: &input.RpcName, + WorkflowType: u.basicInfo.IwfWorkflowType, + WorkflowId: info.WorkflowExecution.ID, + StartTimestampInMs: ptr.Any(rpcExecutionStartTime), + SearchAttributes: u.persistenceManager.GetAllSearchAttributes(), }) } }()