Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Microsoft.Teams.Common.Extensions;

public static class TaskExtensions
{
public static async Task<T> Retry<T>(Func<Task<T>> taskFactory, int max = 3, int delay = 200)
public static async Task<T> Retry<T>(Func<Task<T>> taskFactory, int max = 3, int delay = 500)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@

namespace Microsoft.Teams.Plugins.AspNetCore;

/// <summary>
/// Streaming implementation for Microsoft Teams activities.
///
/// Queues message and typing activities and flushes them in chunks
/// to avoid rate limits and preserve message order.
///
/// Flow:
/// 1. <see cref="Emit(IActivity)"/> adds activities to the queue.
/// 2. <see cref="Flush"/> processes up to 10 queued items under a lock.
/// 3. Informative typing updates are sent immediately if no message started.
/// 4. Message text are combined into a typing chunk.
/// 5. Another flush is scheduled if more items remain.
/// 6. <see cref="Close"/> waits for the queue to empty and sends the final message.
/// </summary>
public partial class AspNetCorePlugin
{
public class Stream : IStreamer
Expand All @@ -35,43 +49,45 @@ public class Stream : IStreamer
private int _count = 0;
private MessageActivity? _result;
private readonly SemaphoreSlim _lock = new(1, 1);

private Timer? _timeout;
private const int _timeoutMs = 5000;

/// <summary>
/// Enqueues a message activity for streaming.
/// </summary>
public void Emit(MessageActivity activity)
{
if (_timeout != null)
{
_timeout.Dispose();
_timeout = null;
}

_queue.Enqueue(activity);
_timeout = new Timer(_ =>
if (_timeout == null)
{
_ = Flush();
}, null, 500, Timeout.Infinite);
}
}

/// <summary>
/// Enqueues a typing activity for streaming.
/// </summary>
public void Emit(TypingActivity activity)
{
if (_timeout != null)
{
_timeout.Dispose();
_timeout = null;
}

_queue.Enqueue(activity);
_timeout = new Timer(_ =>
if (_timeout == null)
{
_ = Flush();
}, null, 500, Timeout.Infinite);
}
}

/// <summary>
/// Emits plain text as a message activity.
/// </summary>
public void Emit(string text)
{
Emit(new MessageActivity(text));
}

/// <summary>
/// Sends an informative typing update (e.g., "Thinking...").
/// </summary>
public void Update(string text)
{
Emit(new TypingActivity(text)
Expand All @@ -83,24 +99,50 @@ public void Update(string text)
});
}

public async Task<bool> WaitForIdAndQueueAsync()
{
var start = DateTime.UtcNow;

while (_id == null || _queue.Count > 0)
{
if ((DateTime.UtcNow - start).TotalMilliseconds > _timeoutMs)
{
return false; // timed out
}

await Task.Delay(50);
}

return true; // success
}

/// <summary>
/// Closes the stream after all queued activities have been sent.
/// Returns the final message activity.
/// </summary>
public async Task<MessageActivity?> Close()
{
if (_index == 1 && _queue.Count == 0 && _lock.CurrentCount > 0) return null;

if (_result is not null) return _result;
while (_id is null || _queue.Count > 0)
bool ready = await WaitForIdAndQueueAsync();
if (!ready)
{
await Task.Delay(50);
return null; // timed out waiting for ID and queue to empty
}

if (_text == string.Empty && _attachments.Count == 0) // when only informative updates are present
{
_text = "Streaming closed with no content";
return null;
}

var activity = new MessageActivity(_text)
.AddAttachment(_attachments.ToArray());

activity.WithId(_id);
if (_id is not null)
{
activity.WithId(_id);
}
activity.WithData(_channelData);
activity.AddEntity(_entities.ToArray());
activity.AddStreamFinal();
Expand All @@ -120,11 +162,20 @@ public void Update(string text)
return (MessageActivity)res;
}

/// <summary>
/// Flushes up to 10 queued activities.
/// Combines message chunks and sends informative updates.
/// Reschedules itself if more items remain.
/// </summary>
protected async Task Flush()
{
if (_queue.Count == 0) return;

await _lock.WaitAsync();

if (!await _lock.WaitAsync(0))
{
return; // another flush is running, exit
}

try
{
Expand All @@ -138,7 +189,7 @@ protected async Task Flush()

Queue<TypingActivity> informativeUpdates = new();

while (i <= 10 && _queue.TryDequeue(out var activity))
while (i < 10 && _queue.TryDequeue(out var activity))
{
if (activity is MessageActivity message)
{
Expand Down Expand Up @@ -195,8 +246,9 @@ async Task SendActivity(TypingActivity toSend)
{
toSend.WithId(_id);
}

toSend.AddStreamUpdate(_index);

var res = await Retry(() => Send(toSend)).ConfigureAwait(false);
OnChunk(res);
_id ??= res.Id;
Expand All @@ -209,4 +261,4 @@ async Task SendActivity(TypingActivity toSend)
}
}
}
}
}

This file was deleted.