Skip to content

Fail executions when worker restarts due to function timeout #10979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
29 changes: 15 additions & 14 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
### Release notes

<!-- Please add your release notes in the following format:
- My change description (#PR)
-->
- Improved memory metrics reporting using CGroup data for Linux consumption (#10968)
- Memory allocation optimizations in `RpcWorkerConfigFactory.AddProviders` (#10959)
- Fixing GrpcWorkerChannel concurrency bug (#10998)
- Avoid circular dependency when resolving LinuxContainerLegionMetricsPublisher. (#10991)
- Add 'unix' to the list of runtimes kept when importing PowerShell worker for Linux builds
- Update PowerShell 7.4 worker to 4.0.4206
- Update Python Worker Version to [4.37.0](https://github.com/Azure/azure-functions-python-worker/releases/tag/4.37.0)
- Add runtime and process metrics. (#11034)
- Add `win-arm64` and `linux-arm64` to the list of PowerShell runtimes; added filter for `osx` RIDs (includes `osx-x64` and `osx-arm64`) (#11013)
### Release notes

<!-- Please add your release notes in the following format:
- My change description (#PR)
-->
- Improved memory metrics reporting using CGroup data for Linux consumption (#10968)
- Memory allocation optimizations in `RpcWorkerConfigFactory.AddProviders` (#10959)
- Fixing GrpcWorkerChannel concurrency bug (#10998)
- Avoid circular dependency when resolving LinuxContainerLegionMetricsPublisher. (#10991)
- Add 'unix' to the list of runtimes kept when importing PowerShell worker for Linux builds
- Update PowerShell 7.4 worker to 4.0.4206
- Update Python Worker Version to [4.37.0](https://github.com/Azure/azure-functions-python-worker/releases/tag/4.37.0)
- Add runtime and process metrics. (#11034)
- Add `win-arm64` and `linux-arm64` to the list of PowerShell runtimes; added filter for `osx` RIDs (includes `osx-x64` and `osx-arm64`) (#11013)
- Bug fix that fails current executions when a worker channel restarts (#10979)
16 changes: 10 additions & 6 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,16 +1556,20 @@ public bool IsExecutingInvocation(string invocationId)

public bool TryFailExecutions(Exception workerException)
{
if (workerException == null)
{
return false;
}

foreach (var invocation in _executingInvocations?.Values)
{
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
invocation.Context?.ResultSource?.TrySetException(workerException);

if (workerException is not null)
{
invocation.Context?.ResultSource?.TrySetException(workerException);
}
else
{
invocation.Context?.ResultSource?.TrySetCanceled();
}

RemoveExecutingInvocation(invocationId);
}
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this function ever return false? Is there is no scenario for it, why is it a bool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into this - the bool returned also isn't used anywhere (in all references, this method is called with no action taken based on the result). Will refactor.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T
{
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
// If invocation id is not found in any of the workers => worker is already disposed. No action needed.
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString());
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString(), timeoutException);
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Task ShutdownAsync()
return Task.CompletedTask;
}

public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default)
{
// Since there's only one channel for httpworker
DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface IFunctionInvocationDispatcher : IDisposable

Task ShutdownAsync();

Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default);

Task StartWorkerChannel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public void Dispose()
Dispose(true);
}

public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default)
{
// Dispose and restart errored channel with the particular invocation id
var channels = await GetInitializedWorkerChannelsAsync();
Expand All @@ -685,7 +685,7 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
if (channel.IsExecutingInvocation(invocationId))
{
_logger.LogDebug($"Restarting channel with workerId: '{channel.Id}' that is executing invocation: '{invocationId}' and timed out.");
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id, exception);
return true;
}
}
Expand Down
18 changes: 18 additions & 0 deletions test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,24 @@ public async Task InFlight_Functions_FailedWithException()
Assert.Equal(workerException, resultSource.Task.Exception.InnerException);
}

[Fact]
public async Task InFlight_Functions_Cancelled_WhenNoException()
{
await CreateDefaultWorkerChannel();

var resultSource = new TaskCompletionSource<ScriptInvocationResult>();
ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(Guid.NewGuid(), resultSource);

await _workerChannel.SendInvocationRequest(scriptInvocationContext);

Assert.True(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString()));

_workerChannel.TryFailExecutions(null);

Assert.False(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString()));
Assert.Equal(TaskStatus.Canceled, resultSource.Task.Status);
}

[Fact]
public async Task SendLoadRequests_PublishesOutboundEvents()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors.Internal;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Description;
Expand Down Expand Up @@ -549,6 +550,61 @@ public async Task FunctionDispatcher_Restart_ErroredChannels_OnWorkerRestart_Not
}
}

[Fact]
public async Task FunctionDispatcher_RestartOfTimedOutChannels_WebHostFailsCurrentExecutions()
{
var invocationId = Guid.NewGuid().ToString();
var mockChannel = new Mock<IRpcWorkerChannel>(MockBehavior.Strict);
var mockJobHostChannelManager = new Mock<IJobHostRpcWorkerChannelManager>(MockBehavior.Strict);
var mockWebHostChannelManager = new Mock<IWebHostRpcWorkerChannelManager>(MockBehavior.Strict);

SetUpMocksForTimeoutTests(mockWebHostChannelManager, mockJobHostChannelManager, mockChannel, invocationId, true);

var workerConfig = new RpcWorkerConfig
{
Description = new RpcWorkerDescription { Language = "test" },
CountOptions = new WorkerProcessCountOptions { ProcessCount = 1 }
};

var dispatcher = GetTestFunctionDispatcher(mockwebHostLanguageWorkerChannelManager: mockWebHostChannelManager, mockJobHostLanguageWorkerChannelManager: mockJobHostChannelManager);

var result = await dispatcher.RestartWorkerWithInvocationIdAsync(invocationId, new FunctionTimeoutException());

Assert.True(result);
mockWebHostChannelManager.Verify(m => m.ShutdownChannelIfExistsAsync(
It.IsAny<string>(),
It.Is<string>(id => id == "testChannelId"),
It.Is<Exception>(ex => ex is FunctionTimeoutException)),
Times.Once);
}

[Fact]
public async Task FunctionDispatcher_RestartOfTimedOutChannels_JobHostFailsCurrentExecutions()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WebHost will try to shut down the channel first and fail executions, but in the case that returns false we want to make sure the JobHost attempts to. See here.

{
var invocationId = Guid.NewGuid().ToString();
var mockChannel = new Mock<IRpcWorkerChannel>(MockBehavior.Strict);
var mockJobHostChannelManager = new Mock<IJobHostRpcWorkerChannelManager>(MockBehavior.Strict);
var mockWebHostChannelManager = new Mock<IWebHostRpcWorkerChannelManager>(MockBehavior.Strict);

SetUpMocksForTimeoutTests(mockWebHostChannelManager, mockJobHostChannelManager, mockChannel, invocationId, false);

var workerConfig = new RpcWorkerConfig
{
Description = new RpcWorkerDescription { Language = "test" },
CountOptions = new WorkerProcessCountOptions { ProcessCount = 1 }
};

var dispatcher = GetTestFunctionDispatcher(mockwebHostLanguageWorkerChannelManager: mockWebHostChannelManager, mockJobHostLanguageWorkerChannelManager: mockJobHostChannelManager);

var result = await dispatcher.RestartWorkerWithInvocationIdAsync(invocationId, new FunctionTimeoutException());

Assert.True(result);
mockJobHostChannelManager.Verify(m => m.ShutdownChannelIfExistsAsync(
It.Is<string>(id => id == "testChannelId"),
It.Is<Exception>(ex => ex is FunctionTimeoutException)),
Times.Once);
}

[Fact]
public async Task FunctionDispatcher_Error_WithinThreshold_BucketFills()
{
Expand Down Expand Up @@ -713,6 +769,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(
int maxProcessCountValue = 1,
bool addWebhostChannel = false,
Mock<IWebHostRpcWorkerChannelManager> mockwebHostLanguageWorkerChannelManager = null,
Mock<IJobHostRpcWorkerChannelManager> mockJobHostLanguageWorkerChannelManager = null,
bool throwOnProcessStartUp = false,
TimeSpan? startupIntervals = null,
string runtime = null,
Expand Down Expand Up @@ -767,6 +824,10 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(
{
testWebHostLanguageWorkerChannelManager = mockwebHostLanguageWorkerChannelManager.Object;
}
if (mockJobHostLanguageWorkerChannelManager != null)
{
jobHostLanguageWorkerChannelManager = mockJobHostLanguageWorkerChannelManager.Object;
}

var mockFunctionDispatcherLoadBalancer = new Mock<IRpcFunctionInvocationDispatcherLoadBalancer>();
mockFunctionDispatcherLoadBalancer.Setup(m => m.GetLanguageWorkerChannel(It.IsAny<IEnumerable<IRpcWorkerChannel>>()))
Expand Down Expand Up @@ -867,5 +928,35 @@ private void VerifyStartIntervals(TimeSpan from, TimeSpan to, bool ignoreFirstSt
Assert.True(diff > from && diff < to, $"Expected startup intervals between {from.TotalMilliseconds}ms and {to.TotalMilliseconds}ms. Actual: {diff.TotalMilliseconds}ms.");
}
}

private void SetUpMocksForTimeoutTests(Mock<IWebHostRpcWorkerChannelManager> mockWebHostChannelManager, Mock<IJobHostRpcWorkerChannelManager> mockJobHostChannelManager,
Mock<IRpcWorkerChannel> mockChannel, string invocationId, bool webHostShutdownSucceeds)
{
// Setup the channel managers to return our mock channel
mockWebHostChannelManager.Setup(mockWebHostChannelManager => mockWebHostChannelManager.GetChannels(It.IsAny<string>()))
.Returns(new Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>>());
mockJobHostChannelManager.Setup(m => m.GetChannels(It.IsAny<string>()))
.Returns(new List<IRpcWorkerChannel> { mockChannel.Object });
mockJobHostChannelManager.Setup(m => m.GetChannels())
.Returns(new List<IRpcWorkerChannel> { mockChannel.Object });

// Setup the mock channel to indicate it's executing the specified invocation
mockChannel.Setup(c => c.Id).Returns("testChannelId");
mockChannel.Setup(c => c.IsExecutingInvocation(invocationId)).Returns(true);
mockChannel.Setup(c => c.IsChannelReadyForInvocations()).Returns(true);

// Set up WebHost ShutdownChannelIfExistsAsync to return false, forcing the JobHost to attempt to shutdown channel
mockWebHostChannelManager.Setup(m => m.ShutdownChannelIfExistsAsync(
It.IsAny<string>(),
It.Is<string>(id => id == "testChannelId"),
It.Is<Exception>(ex => ex is FunctionTimeoutException)))
.ReturnsAsync(webHostShutdownSucceeds);

// Set up JobHost ShutdownChannelIfExistsAsync to be called with the right exception type
mockJobHostChannelManager.Setup(m => m.ShutdownChannelIfExistsAsync(
It.Is<string>(id => id == "testChannelId"),
It.Is<Exception>(ex => ex is FunctionTimeoutException)))
.ReturnsAsync(!webHostShutdownSucceeds);
}
}
}
Loading