diff --git a/release_notes.md b/release_notes.md index cb92971115..a1e0b496a6 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,14 +1,15 @@ -### Release notes - - -- Improved memory metrics reporting using CGroup data for Linux consumption (#10968) -- Memory allocation optimizations in `RpcWorkerConfigFactory.AddProviders` (#10959) -- Fixing GrpcWorkerChannel concurrency bug (#10998) -- Avoid circular dependency when resolving LinuxContainerLegionMetricsPublisher. (#10991) -- Add 'unix' to the list of runtimes kept when importing PowerShell worker for Linux builds -- Update PowerShell 7.4 worker to 4.0.4206 -- Update Python Worker Version to [4.37.0](https://github.com/Azure/azure-functions-python-worker/releases/tag/4.37.0) -- Add runtime and process metrics. (#11034) -- Add `win-arm64` and `linux-arm64` to the list of PowerShell runtimes; added filter for `osx` RIDs (includes `osx-x64` and `osx-arm64`) (#11013) +### Release notes + + +- Improved memory metrics reporting using CGroup data for Linux consumption (#10968) +- Memory allocation optimizations in `RpcWorkerConfigFactory.AddProviders` (#10959) +- Fixing GrpcWorkerChannel concurrency bug (#10998) +- Avoid circular dependency when resolving LinuxContainerLegionMetricsPublisher. (#10991) +- Add 'unix' to the list of runtimes kept when importing PowerShell worker for Linux builds +- Update PowerShell 7.4 worker to 4.0.4206 +- Update Python Worker Version to [4.37.0](https://github.com/Azure/azure-functions-python-worker/releases/tag/4.37.0) +- Add runtime and process metrics. (#11034) +- Add `win-arm64` and `linux-arm64` to the list of PowerShell runtimes; added filter for `osx` RIDs (includes `osx-x64` and `osx-arm64`) (#11013) +- Bug fix that fails current executions when a worker channel restarts (#10979) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 88fcda9b91..86d90afa93 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -1556,16 +1556,20 @@ public bool IsExecutingInvocation(string invocationId) public bool TryFailExecutions(Exception workerException) { - if (workerException == null) - { - return false; - } - foreach (var invocation in _executingInvocations?.Values) { string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString(); _workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId); - invocation.Context?.ResultSource?.TrySetException(workerException); + + if (workerException is not null) + { + invocation.Context?.ResultSource?.TrySetException(workerException); + } + else + { + invocation.Context?.ResultSource?.TrySetCanceled(); + } + RemoveExecutingInvocation(invocationId); } return true; diff --git a/src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs b/src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs index cd8b364caf..b846e16389 100644 --- a/src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs +++ b/src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs @@ -54,7 +54,7 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T { _logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException); // If invocation id is not found in any of the workers => worker is already disposed. No action needed. - await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString()); + await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString(), timeoutException); _logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException); } else diff --git a/src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs b/src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs index f76eed4f5a..82af8852e6 100644 --- a/src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs +++ b/src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs @@ -206,7 +206,7 @@ public Task ShutdownAsync() return Task.CompletedTask; } - public Task RestartWorkerWithInvocationIdAsync(string invocationId) + public Task RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default) { // Since there's only one channel for httpworker DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); diff --git a/src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs b/src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs index ee40498bf4..c1e8726749 100644 --- a/src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs +++ b/src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs @@ -23,7 +23,7 @@ public interface IFunctionInvocationDispatcher : IDisposable Task ShutdownAsync(); - Task RestartWorkerWithInvocationIdAsync(string invocationId); + Task RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default); Task StartWorkerChannel(); diff --git a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs index 5025c58591..d0c954cd59 100644 --- a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs +++ b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs @@ -676,7 +676,7 @@ public void Dispose() Dispose(true); } - public async Task RestartWorkerWithInvocationIdAsync(string invocationId) + public async Task RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = default) { // Dispose and restart errored channel with the particular invocation id var channels = await GetInitializedWorkerChannelsAsync(); @@ -685,7 +685,7 @@ public async Task RestartWorkerWithInvocationIdAsync(string invocationId) if (channel.IsExecutingInvocation(invocationId)) { _logger.LogDebug($"Restarting channel with workerId: '{channel.Id}' that is executing invocation: '{invocationId}' and timed out."); - await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id); + await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id, exception); return true; } } diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs index 6489f6c756..c796bcb430 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs @@ -586,6 +586,24 @@ public async Task InFlight_Functions_FailedWithException() Assert.Equal(workerException, resultSource.Task.Exception.InnerException); } + [Fact] + public async Task InFlight_Functions_Cancelled_WhenNoException() + { + await CreateDefaultWorkerChannel(); + + var resultSource = new TaskCompletionSource(); + ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(Guid.NewGuid(), resultSource); + + await _workerChannel.SendInvocationRequest(scriptInvocationContext); + + Assert.True(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString())); + + _workerChannel.TryFailExecutions(null); + + Assert.False(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString())); + Assert.Equal(TaskStatus.Canceled, resultSource.Task.Status); + } + [Fact] public async Task SendLoadRequests_PublishesOutboundEvents() { diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs index 95aec57d2b..12a26c903b 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Microsoft.AspNetCore.Hosting; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors.Internal; using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Description; @@ -549,6 +550,61 @@ public async Task FunctionDispatcher_Restart_ErroredChannels_OnWorkerRestart_Not } } + [Fact] + public async Task FunctionDispatcher_RestartOfTimedOutChannels_WebHostFailsCurrentExecutions() + { + var invocationId = Guid.NewGuid().ToString(); + var mockChannel = new Mock(MockBehavior.Strict); + var mockJobHostChannelManager = new Mock(MockBehavior.Strict); + var mockWebHostChannelManager = new Mock(MockBehavior.Strict); + + SetUpMocksForTimeoutTests(mockWebHostChannelManager, mockJobHostChannelManager, mockChannel, invocationId, true); + + var workerConfig = new RpcWorkerConfig + { + Description = new RpcWorkerDescription { Language = "test" }, + CountOptions = new WorkerProcessCountOptions { ProcessCount = 1 } + }; + + var dispatcher = GetTestFunctionDispatcher(mockwebHostLanguageWorkerChannelManager: mockWebHostChannelManager, mockJobHostLanguageWorkerChannelManager: mockJobHostChannelManager); + + var result = await dispatcher.RestartWorkerWithInvocationIdAsync(invocationId, new FunctionTimeoutException()); + + Assert.True(result); + mockWebHostChannelManager.Verify(m => m.ShutdownChannelIfExistsAsync( + It.IsAny(), + It.Is(id => id == "testChannelId"), + It.Is(ex => ex is FunctionTimeoutException)), + Times.Once); + } + + [Fact] + public async Task FunctionDispatcher_RestartOfTimedOutChannels_JobHostFailsCurrentExecutions() + { + var invocationId = Guid.NewGuid().ToString(); + var mockChannel = new Mock(MockBehavior.Strict); + var mockJobHostChannelManager = new Mock(MockBehavior.Strict); + var mockWebHostChannelManager = new Mock(MockBehavior.Strict); + + SetUpMocksForTimeoutTests(mockWebHostChannelManager, mockJobHostChannelManager, mockChannel, invocationId, false); + + var workerConfig = new RpcWorkerConfig + { + Description = new RpcWorkerDescription { Language = "test" }, + CountOptions = new WorkerProcessCountOptions { ProcessCount = 1 } + }; + + var dispatcher = GetTestFunctionDispatcher(mockwebHostLanguageWorkerChannelManager: mockWebHostChannelManager, mockJobHostLanguageWorkerChannelManager: mockJobHostChannelManager); + + var result = await dispatcher.RestartWorkerWithInvocationIdAsync(invocationId, new FunctionTimeoutException()); + + Assert.True(result); + mockJobHostChannelManager.Verify(m => m.ShutdownChannelIfExistsAsync( + It.Is(id => id == "testChannelId"), + It.Is(ex => ex is FunctionTimeoutException)), + Times.Once); + } + [Fact] public async Task FunctionDispatcher_Error_WithinThreshold_BucketFills() { @@ -713,6 +769,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher( int maxProcessCountValue = 1, bool addWebhostChannel = false, Mock mockwebHostLanguageWorkerChannelManager = null, + Mock mockJobHostLanguageWorkerChannelManager = null, bool throwOnProcessStartUp = false, TimeSpan? startupIntervals = null, string runtime = null, @@ -767,6 +824,10 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher( { testWebHostLanguageWorkerChannelManager = mockwebHostLanguageWorkerChannelManager.Object; } + if (mockJobHostLanguageWorkerChannelManager != null) + { + jobHostLanguageWorkerChannelManager = mockJobHostLanguageWorkerChannelManager.Object; + } var mockFunctionDispatcherLoadBalancer = new Mock(); mockFunctionDispatcherLoadBalancer.Setup(m => m.GetLanguageWorkerChannel(It.IsAny>())) @@ -867,5 +928,35 @@ private void VerifyStartIntervals(TimeSpan from, TimeSpan to, bool ignoreFirstSt Assert.True(diff > from && diff < to, $"Expected startup intervals between {from.TotalMilliseconds}ms and {to.TotalMilliseconds}ms. Actual: {diff.TotalMilliseconds}ms."); } } + + private void SetUpMocksForTimeoutTests(Mock mockWebHostChannelManager, Mock mockJobHostChannelManager, + Mock mockChannel, string invocationId, bool webHostShutdownSucceeds) + { + // Setup the channel managers to return our mock channel + mockWebHostChannelManager.Setup(mockWebHostChannelManager => mockWebHostChannelManager.GetChannels(It.IsAny())) + .Returns(new Dictionary>()); + mockJobHostChannelManager.Setup(m => m.GetChannels(It.IsAny())) + .Returns(new List { mockChannel.Object }); + mockJobHostChannelManager.Setup(m => m.GetChannels()) + .Returns(new List { mockChannel.Object }); + + // Setup the mock channel to indicate it's executing the specified invocation + mockChannel.Setup(c => c.Id).Returns("testChannelId"); + mockChannel.Setup(c => c.IsExecutingInvocation(invocationId)).Returns(true); + mockChannel.Setup(c => c.IsChannelReadyForInvocations()).Returns(true); + + // Set up WebHost ShutdownChannelIfExistsAsync to return false, forcing the JobHost to attempt to shutdown channel + mockWebHostChannelManager.Setup(m => m.ShutdownChannelIfExistsAsync( + It.IsAny(), + It.Is(id => id == "testChannelId"), + It.Is(ex => ex is FunctionTimeoutException))) + .ReturnsAsync(webHostShutdownSucceeds); + + // Set up JobHost ShutdownChannelIfExistsAsync to be called with the right exception type + mockJobHostChannelManager.Setup(m => m.ShutdownChannelIfExistsAsync( + It.Is(id => id == "testChannelId"), + It.Is(ex => ex is FunctionTimeoutException))) + .ReturnsAsync(!webHostShutdownSucceeds); + } } } \ No newline at end of file