diff --git a/Libraries/Microsoft.Teams.Common/Extensions/TaskExtensions.cs b/Libraries/Microsoft.Teams.Common/Extensions/TaskExtensions.cs index 1ce1db45..06314592 100644 --- a/Libraries/Microsoft.Teams.Common/Extensions/TaskExtensions.cs +++ b/Libraries/Microsoft.Teams.Common/Extensions/TaskExtensions.cs @@ -5,7 +5,7 @@ namespace Microsoft.Teams.Common.Extensions; public static class TaskExtensions { - public static async Task Retry(Func> taskFactory, int max = 3, int delay = 200) + public static async Task Retry(Func> taskFactory, int max = 3, int delay = 500) { try { diff --git a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs index b1b0d25e..0811315d 100644 --- a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs +++ b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs @@ -12,6 +12,20 @@ namespace Microsoft.Teams.Plugins.AspNetCore; +/// +/// Streaming implementation for Microsoft Teams activities. +/// +/// Queues message and typing activities and flushes them in chunks +/// to avoid rate limits and preserve message order. +/// +/// Flow: +/// 1. adds activities to the queue. +/// 2. processes up to 10 queued items under a lock. +/// 3. Informative typing updates are sent immediately if no message started. +/// 4. Message text are combined into a typing chunk. +/// 5. Another flush is scheduled if more items remain. +/// 6. waits for the queue to empty and sends the final message. +/// public partial class AspNetCorePlugin { public class Stream : IStreamer @@ -35,43 +49,45 @@ public class Stream : IStreamer private int _count = 0; private MessageActivity? _result; private readonly SemaphoreSlim _lock = new(1, 1); + private Timer? _timeout; + private const int _timeoutMs = 5000; + /// + /// Enqueues a message activity for streaming. + /// public void Emit(MessageActivity activity) { - if (_timeout != null) - { - _timeout.Dispose(); - _timeout = null; - } - _queue.Enqueue(activity); - _timeout = new Timer(_ => + if (_timeout == null) { _ = Flush(); - }, null, 500, Timeout.Infinite); + } } + /// + /// Enqueues a typing activity for streaming. + /// public void Emit(TypingActivity activity) { - if (_timeout != null) - { - _timeout.Dispose(); - _timeout = null; - } - _queue.Enqueue(activity); - _timeout = new Timer(_ => + if (_timeout == null) { _ = Flush(); - }, null, 500, Timeout.Infinite); + } } + /// + /// Emits plain text as a message activity. + /// public void Emit(string text) { Emit(new MessageActivity(text)); } + /// + /// Sends an informative typing update (e.g., "Thinking..."). + /// public void Update(string text) { Emit(new TypingActivity(text) @@ -83,24 +99,50 @@ public void Update(string text) }); } + public async Task WaitForIdAndQueueAsync() + { + var start = DateTime.UtcNow; + + while (_id == null || _queue.Count > 0) + { + if ((DateTime.UtcNow - start).TotalMilliseconds > _timeoutMs) + { + return false; // timed out + } + + await Task.Delay(50); + } + + return true; // success + } + + /// + /// Closes the stream after all queued activities have been sent. + /// Returns the final message activity. + /// public async Task Close() { if (_index == 1 && _queue.Count == 0 && _lock.CurrentCount > 0) return null; + if (_result is not null) return _result; - while (_id is null || _queue.Count > 0) + bool ready = await WaitForIdAndQueueAsync(); + if (!ready) { - await Task.Delay(50); + return null; // timed out waiting for ID and queue to empty } if (_text == string.Empty && _attachments.Count == 0) // when only informative updates are present { - _text = "Streaming closed with no content"; + return null; } var activity = new MessageActivity(_text) .AddAttachment(_attachments.ToArray()); - activity.WithId(_id); + if (_id is not null) + { + activity.WithId(_id); + } activity.WithData(_channelData); activity.AddEntity(_entities.ToArray()); activity.AddStreamFinal(); @@ -120,11 +162,20 @@ public void Update(string text) return (MessageActivity)res; } + /// + /// Flushes up to 10 queued activities. + /// Combines message chunks and sends informative updates. + /// Reschedules itself if more items remain. + /// protected async Task Flush() { if (_queue.Count == 0) return; - await _lock.WaitAsync(); + + if (!await _lock.WaitAsync(0)) + { + return; // another flush is running, exit + } try { @@ -138,7 +189,7 @@ protected async Task Flush() Queue informativeUpdates = new(); - while (i <= 10 && _queue.TryDequeue(out var activity)) + while (i < 10 && _queue.TryDequeue(out var activity)) { if (activity is MessageActivity message) { @@ -195,8 +246,9 @@ async Task SendActivity(TypingActivity toSend) { toSend.WithId(_id); } - + toSend.AddStreamUpdate(_index); + var res = await Retry(() => Send(toSend)).ConfigureAwait(false); OnChunk(res); _id ??= res.Id; @@ -209,4 +261,4 @@ async Task SendActivity(TypingActivity toSend) } } } -} \ No newline at end of file +} diff --git a/Tests/Microsoft.Teams.Plugins.AspNetCore.Tests/AspNetCorePluginStreamTests.cs b/Tests/Microsoft.Teams.Plugins.AspNetCore.Tests/AspNetCorePluginStreamTests.cs deleted file mode 100644 index 7320121b..00000000 --- a/Tests/Microsoft.Teams.Plugins.AspNetCore.Tests/AspNetCorePluginStreamTests.cs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using Microsoft.Teams.Api.Activities; -using Microsoft.Teams.Api.Entities; - -namespace Microsoft.Teams.Plugins.AspNetCore.Tests; - -public class AspNetCorePluginStreamTests -{ - [Fact] - public async Task Stream_EmitMessage_FlushesAfter500ms() - { - var sendCallCount = 0; - var sendTimes = new List(); - var stream = new AspNetCorePlugin.Stream - { - Send = activity => - { - sendCallCount++; - sendTimes.Add(DateTime.Now); - activity.Id = $"test-id-{sendCallCount}"; - return Task.FromResult(activity); - } - }; - - var startTime = DateTime.Now; - - stream.Emit("Test message"); - await Task.Delay(600); // Wait longer than 500ms timeout - - Assert.True(sendCallCount > 0, "Should have sent at least one message"); - Assert.True(sendTimes.Any(t => t >= startTime.AddMilliseconds(450)), - "Should have waited approximately 500ms before sending"); - } - - [Fact] - public async Task Stream_MultipleEmits_RestartsTimer() - { - var sendCallCount = 0; - var stream = new AspNetCorePlugin.Stream - { - Send = activity => - { - sendCallCount++; - activity.Id = $"test-id-{sendCallCount}"; - return Task.FromResult(activity); - } - }; - - stream.Emit("First message"); - await Task.Delay(300); // Wait less than 500ms - - stream.Emit("Second message"); // This should reset the timer - await Task.Delay(300); // Still less than 500ms from second emit - - Assert.Equal(0, sendCallCount); // Should not have sent yet - - await Task.Delay(300); // Now over 500ms from second emit - - Assert.True(sendCallCount > 0, "Should have sent messages after timer expired"); - } - - [Fact] - public async Task Stream_SendTimeout_HandledGracefully() - { - var callCount = 0; - var stream = new AspNetCorePlugin.Stream - { - Send = activity => - { - callCount++; - if (callCount == 1) // Fail first attempt - { - throw new TimeoutException("Operation timed out"); - } - - // Succeed on second attempt - activity.Id = $"success-after-timeout-{callCount}"; - return Task.FromResult(activity); - } - }; - - stream.Emit("Test message with timeout"); - await Task.Delay(600); // Wait for flush and retries - - var result = await stream.Close(); - - Assert.True(callCount > 1, "Should have retried after timeout"); - Assert.NotNull(result); - Assert.Contains("Test message with timeout", result.Text); - } - - [Fact] - public async Task Stream_UpdateStatus_SendsTypingActivity() - { - var sentActivities = new List(); - var stream = new AspNetCorePlugin.Stream - { - Send = activity => - { - sentActivities.Add(activity); - return Task.FromResult(activity); - } - }; - - stream.Update("Thinking..."); - await Task.Delay(600); // Wait for the flush task to complete - - Assert.True(stream.Count > 0, "Should have processed the update"); - Assert.Equal(2, stream.Sequence); // Should increment sequence after sending - - Assert.True(sentActivities.Count > 0, "Should have sent at least one activity"); - var sentActivity = sentActivities.First(); - Assert.IsType(sentActivity); - Assert.Equal("Thinking...", ((TypingActivity)sentActivity).Text); - Assert.Equal(StreamType.Informative, ((TypingActivity)sentActivity).ChannelData?.StreamType); - } - -} \ No newline at end of file