Skip to content

Commit

Permalink
clear lp cache on workflow node finalize
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Sep 18, 2024
1 parent 40b60b4 commit d15920e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 3 deletions.
10 changes: 7 additions & 3 deletions flytepropeller/pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMetrics(scope promutils.Scope) metrics {
}

func (w *workflowNodeHandler) FinalizeRequired() bool {
return false
return true
}

func (w *workflowNodeHandler) Setup(_ context.Context, _ interfaces.SetupContext) error {
Expand Down Expand Up @@ -120,8 +120,12 @@ func (w *workflowNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExe
return nil
}

func (w *workflowNodeHandler) Finalize(ctx context.Context, _ interfaces.NodeExecutionContext) error {
logger.Warnf(ctx, "Subworkflow finalize invoked. Nothing to be done")
func (w *workflowNodeHandler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext) error {
logger.Warnf(ctx, "Subworkflow finalize invoked. Clearing up the cache")
wfNode := nCtx.Node().GetWorkflowNode()
if wfNode.GetLaunchPlanRefID() != nil {
return w.lpHandler.Finalize(ctx, nCtx)
}
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,20 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx interfaces.Nod
}
return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("cascading abort as parent execution id [%s] aborted, reason [%s]", nCtx.ExecutionContext().GetName(), reason))
}

func (l *launchPlanHandler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext) error {
parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return err
}
childID, err := GetChildWorkflowExecutionIDForExecution(
parentNodeExecutionID,
nCtx,
)
if err != nil {
// THIS SHOULD NEVER HAPPEN
return err
}

return l.launchPlan.Finalize(ctx, childID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func (a *adminLaunchPlanExecutor) Initialize(ctx context.Context) error {
return a.cache.Start(ctx)
}

func (a *adminLaunchPlanExecutor) Finalize(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) error {
return a.cache.DeleteDelayed(executionID.String())
}

func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batch) (
resp []cache.ItemSyncResponse, err error) {
resp = make([]cache.ItemSyncResponse, 0, len(batch))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Executor interface {

// Initialize initializes Executor.
Initialize(ctx context.Context) error

// Finalize clears the cache of the LaunchPlan execution
Finalize(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) error
}

type Reader interface {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d15920e

Please sign in to comment.