Skip to content

Commit

Permalink
fix: controller: ensures workflow reconciling task result properly wh…
Browse files Browse the repository at this point in the history
…en failing to received timely updates from api server

error scenario: a pod for a step in a workflow has completed, and its task
result are properly created and finalized by its wait container (judging from
the exit status of the wait container), however, the task result informer in
the controller leader has not received any updates about it (due to overloaded
api server or etcd).

currently, the argo workflow controller doesn't handle the above scenario
properly. it would mark the workflow node succeeded and shows no artifact
outputs (even though they are already uploaded to the repository).

we did run into this situation in our production instance (it's v3.5.8).

it's not easy to reproduce this problem, but we can have a manual fault
injection in `workflow/controller/taskresult.go:func
(woc *wfOperationCtx) taskResultReconciliation()` to simulate the situation and
I did reproduce the issue on release v3.6.2:

```diff
+++ workflow/controller/taskresult.go
@@ -1,7 +1,9 @@
 package controller

 import (
+       "os"
        "reflect"
+       "strings"
        "time"

        log "github.com/sirupsen/logrus"
@@ -62,6 +64,12 @@ func (woc *wfOperationCtx) taskResultReconciliation() {
        objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
        woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")

+       if strings.Contains(woc.wf.Name, "-xhu-debug-") {
+               if _, err := os.Stat("/tmp/xhu-debug-control"); err != nil {
+                       return
+               }
+       }
```

the change is to forcefully mark the workflow having incomplete TaskResult in
assessNodeStatus.

this fix doesn't handle the case when a pod failed, there are too many
potentially failure scenarios (like the wait container might not be able to
insert a task result). plus, a retry is probably needed when there are
failures. the loss is probably not as great as a successful one.

Signed-off-by: Xiaofan Hu <[email protected]>
  • Loading branch information
bom-d-van committed Dec 23, 2024
1 parent 6699ab3 commit 747f1e6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,15 @@ func (ws *WorkflowStatus) IsTaskResultIncomplete(name string) bool {
return false // workflows from older versions do not have this status, so assume completed if this is missing
}

func (ws *WorkflowStatus) IsTaskResultInited(name string) bool {
if ws.TaskResultsCompletionStatus == nil {
return false
}

_, found := ws.TaskResultsCompletionStatus[name]
return found
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
return ws.OffloadNodeStatusVersion != ""
}
Expand Down
18 changes: 18 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,24 @@ func (woc *wfOperationCtx) assessNodeStatus(ctx context.Context, pod *apiv1.Pod,
new.PodIP = pod.Status.PodIP
}

if resultName := woc.nodeID(pod); new.Phase == wfv1.NodeSucceeded &&
tmpl.HasOutputs() && !woc.wf.Status.IsTaskResultInited(resultName) {
// error scenario: a pod for a step in a workflow has completed, and its task
// result are properly created and finalized by its wait container (judging from
// the exit status of the wait container), however, the task result informer in
// the controller leader has not received any updates about it (due to overloaded
// api server or etcd).
//
// the change is to forcefully mark the workflow having incomplete TaskResult in
// assessNodeStatus.
//
// this fix doesn't handle the case when a pod failed, there are too many
// potentially failure scenarios (like the wait container might not be able to
// insert a task result). plus, a retry is probably needed when there are
// failures. the loss is probably not as great as a successful one.
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}

new.HostNodeName = pod.Spec.NodeName

if !new.Progress.IsValid() {
Expand Down
22 changes: 19 additions & 3 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10855,8 +10855,20 @@ spec:
i = random.randint(1, 100)
print(i)`

// TestWorkflowNeedReconcile test whether a workflow need reconcile taskresults.
// testWorkflowNeedReconcile test whether a workflow need reconcile taskresults.
func TestWorkflowNeedReconcile(t *testing.T) {
t.Run("hasIncompleteTaskResult", func(t *testing.T) {
hasIncompleteTaskResult := true
testWorkflowNeedReconcileHelper(t, hasIncompleteTaskResult)
})

t.Run("hasNoTaskResult", func(t *testing.T) {
hasIncompleteTaskResult := false
testWorkflowNeedReconcileHelper(t, hasIncompleteTaskResult)
})
}

func testWorkflowNeedReconcileHelper(t *testing.T, hasIncompleteTaskResult bool) {
cancel, controller := newController()
defer cancel()
ctx := context.Background()
Expand All @@ -10877,9 +10889,13 @@ func TestWorkflowNeedReconcile(t *testing.T) {
wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
require.NoError(t, err)
woc = newWorkflowOperationCtx(wf, controller)
for _, node := range woc.wf.Status.Nodes {
woc.wf.Status.MarkTaskResultIncomplete(node.ID)

if hasIncompleteTaskResult {
for _, node := range woc.wf.Status.Nodes {
woc.wf.Status.MarkTaskResultIncomplete(node.ID)
}
}

err, podReconciliationCompleted := woc.podReconciliation(ctx)
require.NoError(t, err)
assert.False(t, podReconciliationCompleted)
Expand Down

0 comments on commit 747f1e6

Please sign in to comment.