From 962d792cbe9fdff29be7b3695ea9488c3cb3cb06 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 30 May 2025 14:11:21 -0700 Subject: [PATCH 1/3] Added fix and test --- internal/internal_workflow_testsuite.go | 22 +++++++--- internal/workflow_testsuite_test.go | 58 +++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f239186e2..da4c60d63 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2281,13 +2281,23 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelChildWorkflow(_, workflowID func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler) { if env.workflowInfo.WorkflowExecution.ID == workflowID { - // cancel current workflow - env.workflowCancelHandler() - // check if current workflow is a child workflow - if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { + // cancel current workflow from within workflow context + if sd, ok := env.workflowDef.(*syncWorkflowDefinition); ok { env.postCallback(func() { - env.onChildWorkflowCanceledListener(env.workflowInfo) - }, false) + sd.dispatcher.NewCoroutine(sd.rootCtx, "cancel-self", true, func(ctx Context) { + env.workflowCancelHandler() + }) + if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { + env.onChildWorkflowCanceledListener(env.workflowInfo) + } + }, true) + } else { + env.workflowCancelHandler() + if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { + env.postCallback(func() { + env.onChildWorkflowCanceledListener(env.workflowInfo) + }, false) + } } return } else if childHandle, ok := env.runningWorkflows[workflowID]; ok && !childHandle.handled { diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 2326e7c55..da3f7aef3 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -1232,3 +1232,61 @@ func TestDynamicWorkflows(t *testing.T) { require.NoError(t, err) require.Equal(t, "dynamic-activity - grape - cherry", result) } + +func SleepHour(ctx Context) error { + Sleep(ctx, time.Hour) + return nil +} + +func SleepThenCancel(ctx Context) error { + selector := NewSelector(ctx) + var activationWorkflow *WorkflowExecution + selector.AddReceive(GetSignalChannel(ctx, "activate"), func(c ReceiveChannel, more bool) { + c.Receive(ctx, nil) + GetLogger(ctx).Info("Received activation signal") + if activationWorkflow != nil { + RequestCancelExternalWorkflow(ctx, activationWorkflow.ID, activationWorkflow.RunID) + } + + cwf := ExecuteChildWorkflow( + ctx, + SleepHour, + ) + + var res WorkflowExecution + if err := cwf.GetChildWorkflowExecution().Get(ctx, &res); err != nil { + GetLogger(ctx).Error("Failed to start child workflow", "error", err) + return + } + activationWorkflow = &res + + selector.AddFuture(cwf, func(f Future) { + if err := f.Get(ctx, nil); err != nil { + GetLogger(ctx).Error("Child workflow failed", "error", err) + } else { + GetLogger(ctx).Info("Child workflow completed successfully") + } + activationWorkflow = nil + }) + }) + + for selector.HasPending() || activationWorkflow != nil { + selector.Select(ctx) + } + return nil +} + +func TestRequestCancelExternalWorkflowInSelector(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(SleepHour) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("activate", nil) + }, 0) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("activate", nil) + }, time.Second) + env.ExecuteWorkflow(SleepThenCancel) + require.NoError(t, env.GetWorkflowError()) + env.IsWorkflowCompleted() +} From 572386f67d6d2bfebafc7e1ee1566ba543a0925c Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Jun 2025 11:02:46 -0500 Subject: [PATCH 2/3] Fix go run check --- internal/workflow_testsuite_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index da3f7aef3..dfa2438eb 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -1234,8 +1234,7 @@ func TestDynamicWorkflows(t *testing.T) { } func SleepHour(ctx Context) error { - Sleep(ctx, time.Hour) - return nil + return Sleep(ctx, time.Hour) } func SleepThenCancel(ctx Context) error { From ae6d39a662ae613c32168b2de57e36f12860c913 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 14 Jul 2025 15:17:15 -0700 Subject: [PATCH 3/3] Simplify test to min-repro, add comment explaining solution --- internal/internal_workflow_testsuite.go | 27 ++++++------ internal/workflow_testsuite_test.go | 55 +++++++------------------ 2 files changed, 28 insertions(+), 54 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index da4c60d63..3e473899f 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2281,23 +2281,20 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelChildWorkflow(_, workflowID func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler) { if env.workflowInfo.WorkflowExecution.ID == workflowID { - // cancel current workflow from within workflow context - if sd, ok := env.workflowDef.(*syncWorkflowDefinition); ok { - env.postCallback(func() { - sd.dispatcher.NewCoroutine(sd.rootCtx, "cancel-self", true, func(ctx Context) { - env.workflowCancelHandler() - }) - if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { - env.onChildWorkflowCanceledListener(env.workflowInfo) - } - }, true) + // The way testWorkflowEnvironment is setup today, we close the child workflow dispatcher before calling + // the workflowCancelHandler. A larger refactor would be needed to handle this similar to non-test code. + // Maybe worth doing when https://github.com/temporalio/go-sdk/issues/50 is tackled. + if sd, ok := env.workflowDef.(*syncWorkflowDefinition); ok && env.isChildWorkflow() { + sd.dispatcher.NewCoroutine(sd.rootCtx, "cancel-self", true, func(ctx Context) { + env.workflowCancelHandler() + }) } else { env.workflowCancelHandler() - if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { - env.postCallback(func() { - env.onChildWorkflowCanceledListener(env.workflowInfo) - }, false) - } + } + if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil { + env.postCallback(func() { + env.onChildWorkflowCanceledListener(env.workflowInfo) + }, false) } return } else if childHandle, ok := env.runningWorkflows[workflowID]; ok && !childHandle.handled { diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index dfa2438eb..9b9220f83 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -1238,53 +1238,30 @@ func SleepHour(ctx Context) error { } func SleepThenCancel(ctx Context) error { - selector := NewSelector(ctx) - var activationWorkflow *WorkflowExecution - selector.AddReceive(GetSignalChannel(ctx, "activate"), func(c ReceiveChannel, more bool) { - c.Receive(ctx, nil) - GetLogger(ctx).Info("Received activation signal") - if activationWorkflow != nil { - RequestCancelExternalWorkflow(ctx, activationWorkflow.ID, activationWorkflow.RunID) - } - - cwf := ExecuteChildWorkflow( - ctx, - SleepHour, - ) - - var res WorkflowExecution - if err := cwf.GetChildWorkflowExecution().Get(ctx, &res); err != nil { - GetLogger(ctx).Error("Failed to start child workflow", "error", err) - return - } - activationWorkflow = &res - - selector.AddFuture(cwf, func(f Future) { - if err := f.Get(ctx, nil); err != nil { - GetLogger(ctx).Error("Child workflow failed", "error", err) - } else { - GetLogger(ctx).Info("Child workflow completed successfully") - } - activationWorkflow = nil - }) - }) + cwf := ExecuteChildWorkflow( + ctx, + SleepHour, + ) + var res WorkflowExecution + if err := cwf.GetChildWorkflowExecution().Get(ctx, &res); err != nil { + return err + } - for selector.HasPending() || activationWorkflow != nil { - selector.Select(ctx) + // Canceling an external workflow that causes a timer to cancel used to fail due to + // "illegal access from outside of workflow context" + err := RequestCancelExternalWorkflow(ctx, res.ID, res.RunID).Get(ctx, nil) + if err != nil { + return err } - return nil + + // Give the workflow time to finish canceling the child workflow + return Sleep(ctx, 1*time.Second) } func TestRequestCancelExternalWorkflowInSelector(t *testing.T) { testSuite := &WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() env.RegisterWorkflow(SleepHour) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("activate", nil) - }, 0) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("activate", nil) - }, time.Second) env.ExecuteWorkflow(SleepThenCancel) require.NoError(t, env.GetWorkflowError()) env.IsWorkflowCompleted()