Skip to content

Features/init reactive x #1084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageVersion Include="System.IdentityModel.Tokens.Jwt" Version="8.0.0" />
<PackageVersion Include="System.Memory.Data" Version="8.0.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
<PackageVersion Include="System.Reactive" Version="6.0.1" />
<PackageVersion Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageVersion Include="Serilog.Extensions.Logging" Version="9.0.0" />
<PackageVersion Include="Serilog.Sinks.File" Version="6.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
<PackageReference Include="System.Memory.Data" />
<PackageReference Include="System.Text.Json" />
<PackageReference Include="System.Reactive" />
<PackageReference Include="Serilog.Sinks.Console" />
<PackageReference Include="Serilog.Sinks.File" />
<PackageReference Include="Rougamo.Fody" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ChatResponseDto : InstructResult
[JsonPropertyName("has_message_files")]
public bool HasMessageFiles { get; set; }

[JsonPropertyName("is_streaming")]
public bool IsStreaming { get; set; }

[JsonPropertyName("created_at")]
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface IConversationService
/// <param name="onResponseReceived">Received the response from AI Agent</param>
/// <returns></returns>
Task<bool> SendMessage(string agentId,
RoleDialogModel lastDialog,
RoleDialogModel message,
PostbackMessageModel? replyMessage,
Func<RoleDialogModel, Task> onResponseReceived);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public class RoleDialogModel : ITrackableMessage
[JsonIgnore(Condition = JsonIgnoreCondition.Always)]
public string RenderedInstruction { get; set; } = string.Empty;

[JsonIgnore(Condition = JsonIgnoreCondition.Always)]
public bool IsStreaming { get; set; }

private RoleDialogModel()
{
}
Expand Down Expand Up @@ -159,7 +162,8 @@ public static RoleDialogModel From(RoleDialogModel source,
Payload = source.Payload,
StopCompletion = source.StopCompletion,
Instruction = source.Instruction,
Data = source.Data
Data = source.Data,
IsStreaming = source.IsStreaming
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ Task<bool> GetChatCompletionsAsync(Agent agent,
Func<RoleDialogModel, Task> onMessageReceived,
Func<RoleDialogModel, Task> onFunctionExecuting);

Task<bool> GetChatCompletionsStreamingAsync(Agent agent,
List<RoleDialogModel> conversations,
Func<RoleDialogModel, Task> onMessageReceived);
Task<RoleDialogModel> GetChatCompletionsStreamingAsync(Agent agent,
List<RoleDialogModel> conversations);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Abstraction.Observables.Models;

public class HubObserveData : ObserveDataBase
{
public string EventName { get; set; } = null!;
public RoleDialogModel Data { get; set; } = null!;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace BotSharp.Abstraction.Observables.Models;

public abstract class ObserveDataBase
{
public IServiceProvider ServiceProvider { get; set; } = null!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ public interface IRoutingService
/// <returns></returns>
RoutingRule[] GetRulesByAgentId(string id);

//void ResetRecursiveCounter();
//int GetRecursiveCounter();
//void SetRecursiveCounter(int counter);

Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialogs);
Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialogs, bool useStream = false);
Task<bool> InvokeFunction(string name, RoleDialogModel messages);
Task<RoleDialogModel> InstructLoop(Agent agent, RoleDialogModel message, List<RoleDialogModel> dialogs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
using BotSharp.Core.Routing.Reasoning;
using BotSharp.Core.Templating;
using BotSharp.Core.Translation;
using BotSharp.Core.Observables.Queues;
using Microsoft.Extensions.Configuration;
using BotSharp.Abstraction.Observables.Models;

namespace BotSharp.Core.Conversations;

Expand Down Expand Up @@ -41,6 +43,8 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
return settingService.Bind<GoogleApiSettings>("GoogleApi");
});

services.AddSingleton<MessageHub<HubObserveData>>();

services.AddScoped<IConversationStorage, ConversationStorage>();
services.AddScoped<IConversationService, ConversationService>();
services.AddScoped<IConversationProgressService, ConversationProgressService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public GetWeatherFn(IServiceProvider services)
public async Task<bool> Execute(RoleDialogModel message)
{
message.Content = $"It is a sunny day!";
//message.StopCompletion = true;
message.StopCompletion = false;
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using System.IO;

namespace BotSharp.Plugin.GoogleAI.Models.Realtime;
namespace BotSharp.Core.Infrastructures.Streams;

internal class RealtimeTranscriptionResponse : IDisposable
public class RealtimeTextStream : IDisposable
{
public RealtimeTranscriptionResponse()
public RealtimeTextStream()
{

}

private bool _disposed = false;
Expand All @@ -20,6 +20,13 @@ public Stream? ContentStream
}
}

public long Length => _contentStream.Length;

public bool IsNullOrEmpty()
{
return _contentStream == null || Length == 0;
}

public void Collect(string text)
{
if (_disposed) return;
Expand Down
43 changes: 43 additions & 0 deletions src/Infrastructure/BotSharp.Core/Observables/Queues/MessageHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Reactive.Subjects;

namespace BotSharp.Core.Observables.Queues;

public class MessageHub<T> where T : class
{
private readonly ILogger<MessageHub<T>> _logger;
private readonly ISubject<T> _observable = new Subject<T>();
public IObservable<T> Events => _observable;

public MessageHub(ILogger<MessageHub<T>> logger)
{
_logger = logger;
}

/// <summary>
/// Push an item to the observers.
/// </summary>
/// <param name="item"></param>
public void Push(T item)
{
_observable.OnNext(item);
}

/// <summary>
/// Send a complete notification to the observers.
/// This will stop the observers from receiving data.
/// </summary>
public void Complete()
{
_observable.OnCompleted();
}

/// <summary>
/// Send an error notification to the observers.
/// This will stop the observers from receiving data.
/// </summary>
/// <param name="error"></param>
public void Error(Exception error)
{
_observable.OnError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ await HookEmitter.Emit<IRoutingHook>(_services, async hook => await hook.OnRouti
}
else
{
var ret = await routing.InvokeAgent(agentId, dialogs);
var state = _services.GetRequiredService<IConversationStateService>();
var useStreamMsg = state.GetState("use_stream_message");
var ret = await routing.InvokeAgent(agentId, dialogs, bool.TryParse(useStreamMsg, out var useStream) && useStream);
}

var response = dialogs.Last();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace BotSharp.Core.Routing;

public partial class RoutingService
{
public async Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialogs)
public async Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialogs, bool useStream = false)
{
var agentService = _services.GetRequiredService<IAgentService>();
var agent = await agentService.LoadAgent(agentId);
Expand All @@ -30,8 +30,16 @@ public async Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialog
provider: provider,
model: model);

RoleDialogModel response;
var message = dialogs.Last();
var response = await chatCompletion.GetChatCompletions(agent, dialogs);
if (useStream)
{
response = await chatCompletion.GetChatCompletionsStreamingAsync(agent, dialogs);
}
else
{
response = await chatCompletion.GetChatCompletions(agent, dialogs);
}

if (response.Role == AgentRole.Function)
{
Expand All @@ -45,8 +53,9 @@ public async Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialog
message.FunctionArgs = response.FunctionArgs;
message.Indication = response.Indication;
message.CurrentAgentId = agent.Id;
message.IsStreaming = response.IsStreaming;

await InvokeFunction(message, dialogs);
await InvokeFunction(message, dialogs, useStream);
}
else
{
Expand All @@ -59,14 +68,15 @@ public async Task<bool> InvokeAgent(string agentId, List<RoleDialogModel> dialog

message = RoleDialogModel.From(message, role: AgentRole.Assistant, content: response.Content);
message.CurrentAgentId = agent.Id;
message.IsStreaming = response.IsStreaming;
dialogs.Add(message);
Context.SetDialogs(dialogs);
}

return true;
}

private async Task<bool> InvokeFunction(RoleDialogModel message, List<RoleDialogModel> dialogs)
private async Task<bool> InvokeFunction(RoleDialogModel message, List<RoleDialogModel> dialogs, bool useStream)
{
// execute function
// Save states
Expand Down Expand Up @@ -102,7 +112,7 @@ private async Task<bool> InvokeFunction(RoleDialogModel message, List<RoleDialog

// Send to Next LLM
var curAgentId = routing.Context.GetCurrentAgentId();
await InvokeAgent(curAgentId, dialogs);
await InvokeAgent(curAgentId, dialogs, useStream);
}
}
else
Expand Down
4 changes: 3 additions & 1 deletion src/Infrastructure/BotSharp.Core/Routing/RoutingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public async Task<RoleDialogModel> InstructDirect(Agent agent, RoleDialogModel m
}
else
{
var ret = await routing.InvokeAgent(agentId, dialogs);
var state = _services.GetRequiredService<IConversationStateService>();
var useStreamMsg = state.GetState("use_stream_message");
var ret = await routing.InvokeAgent(agentId, dialogs, bool.TryParse(useStreamMsg, out var useStream) && useStream);
}

var response = dialogs.Last();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ await conv.SendMessage(agentId, inputMsg,
return response;
}


[HttpPost("/conversation/{agentId}/{conversationId}/sse")]
public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string conversationId, [FromBody] NewMessageModel input)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public Task<bool> GetChatCompletionsAsync(Agent agent, List<RoleDialogModel> con
throw new NotImplementedException();
}

public Task<bool> GetChatCompletionsStreamingAsync(Agent agent, List<RoleDialogModel> conversations,
Func<RoleDialogModel, Task> onMessageReceived)
public Task<RoleDialogModel> GetChatCompletionsStreamingAsync(Agent agent, List<RoleDialogModel> conversations)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Infrastructure\BotSharp.Abstraction\BotSharp.Abstraction.csproj" />
<ProjectReference Include="..\..\Infrastructure\BotSharp.Core\BotSharp.Core.csproj" />
</ItemGroup>

</Project>
Loading
Loading