Skip to content

Commit

Permalink
IWF-512: Add start timestamp field to all events
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski committed Feb 5, 2025
1 parent 5b9db14 commit 2175081
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 77 deletions.
11 changes: 7 additions & 4 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}

stateApiExecuteStartTime := time.Now().UnixMilli()

defer func() {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &req.RpcName,
WorkflowType: rpcPrep.IwfWorkflowType,
WorkflowId: req.GetWorkflowId(),
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &req.RpcName,
WorkflowType: rpcPrep.IwfWorkflowType,
WorkflowId: req.GetWorkflowId(),
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
// search attributes are not available at this time
})
}()
Expand Down
60 changes: 32 additions & 28 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ func StateApiWaitUntil(
printDebugMsg(logger, err, iwfWorkerBaseUrl)
if checkHttpError(err, httpResp) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
SearchAttributes: searchAttributes,
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime),
SearchAttributes: searchAttributes,
})
return nil, composeHttpError(
activityInfo.IsLocalActivity,
Expand All @@ -72,13 +73,14 @@ func StateApiWaitUntil(

if err := checkCommandRequestFromWaitUntilResponse(resp); err != nil {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
SearchAttributes: searchAttributes,
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime),
SearchAttributes: searchAttributes,
})
return nil, composeStartApiRespError(provider, err, resp)
}
Expand Down Expand Up @@ -147,13 +149,14 @@ func StateApiExecute(
printDebugMsg(logger, err, iwfWorkerBaseUrl)
if checkHttpError(err, httpResp) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
SearchAttributes: searchAttributes,
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
SearchAttributes: searchAttributes,
})
return nil, composeHttpError(
activityInfo.IsLocalActivity,
Expand All @@ -162,13 +165,14 @@ func StateApiExecute(

if err = checkStateDecisionFromResponse(resp); err != nil {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
SearchAttributes: searchAttributes,
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
WorkflowType: input.Request.WorkflowType,
WorkflowId: activityInfo.WorkflowExecution.ID,
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
SearchAttributes: searchAttributes,

Check warning on line 175 in service/interpreter/activityImpl.go

View check run for this annotation

Codecov / codecov/patch

service/interpreter/activityImpl.go#L168-L175

Added lines #L168 - L175 were not covered by tests
})
return nil, composeExecuteApiRespError(provider, err, resp)
}
Expand Down
86 changes: 46 additions & 40 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func InterpreterImpl(
})
} else if provider.IsApplicationError(retErr) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: sas,
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: sas,
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
})
}
}
Expand Down Expand Up @@ -165,11 +166,12 @@ func InterpreterImpl(
if !input.IsResumeFromContinueAsNew {
if !provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_START_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.WORKFLOW_START_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
})
}
// it's possible that a workflow is started without any starting state
Expand Down Expand Up @@ -596,18 +598,19 @@ func processStateExecution(
saLoadingPolicy := compatibility.GetWaitUntilApiSearchAttributesLoadingPolicy(state.StateOptions)
doLoadingPolicy := compatibility.GetWaitUntilApiDataObjectsLoadingPolicy(state.StateOptions)

stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli()
if !provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli()
errStartApi = provider.ExecuteActivity(&startResponse, configer.ShouldOptimizeActivity(), ctx,
waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{
IwfWorkerUrl: basicInfo.IwfWorkerUrl,
Expand Down Expand Up @@ -636,13 +639,14 @@ func processStateExecution(
})
} else {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
}
Expand Down Expand Up @@ -872,18 +876,19 @@ func invokeStateExecute(
ctx = provider.WithActivityOptions(ctx, activityOptions)
var decideResponse *iwfidl.WorkflowStateDecideResponse

stateExecuteApiStartTime := provider.Now(ctx).UnixMilli()
if !provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
StartTimestampInMs: ptr.Any(stateExecuteApiStartTime),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
stateExecuteApiStartTime := provider.Now(ctx).UnixMilli()
err = provider.ExecuteActivity(&decideResponse, configer.ShouldOptimizeActivity(), ctx,
executeApi, provider.GetBackendType(), service.StateDecideActivityInput{
IwfWorkerUrl: basicInfo.IwfWorkerUrl,
Expand Down Expand Up @@ -913,13 +918,14 @@ func invokeStateExecute(
})
} else {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT,
WorkflowType: basicInfo.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StartTimestampInMs: ptr.Any(stateExecuteApiStartTime),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
}
Expand Down
14 changes: 9 additions & 5 deletions service/interpreter/workflowUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/event"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/interpreter/config"
"github.com/indeedeng/iwf/service/interpreter/cont"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
Expand Down Expand Up @@ -61,14 +62,17 @@ func (u *WorkflowUpdater) handler(

info := u.provider.GetWorkflowInfo(ctx)

rpcExecutionStartTime := u.provider.Now(ctx).UnixMilli()

defer func() {
if !u.provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &input.RpcName,
WorkflowType: u.basicInfo.IwfWorkflowType,
WorkflowId: info.WorkflowExecution.ID,
SearchAttributes: u.persistenceManager.GetAllSearchAttributes(),
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &input.RpcName,
WorkflowType: u.basicInfo.IwfWorkflowType,
WorkflowId: info.WorkflowExecution.ID,
StartTimestampInMs: ptr.Any(rpcExecutionStartTime),
SearchAttributes: u.persistenceManager.GetAllSearchAttributes(),
})
}
}()
Expand Down

0 comments on commit 2175081

Please sign in to comment.