Skip to content

Features/init reactive x #1084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

iceljc
Copy link
Collaborator

@iceljc iceljc commented Jun 27, 2025

PR Type

Enhancement


Description

  • Implement reactive streaming architecture with message hub

  • Add streaming support for chat completions

  • Integrate observer pattern for real-time message handling

  • Enhance SignalR integration with streaming capabilities


Changes walkthrough 📝

Relevant files
Enhancement
14 files
ChatResponseDto.cs
Add streaming indicator property                                                 
+3/-0     
RoleDialogModel.cs
Add streaming state tracking                                                         
+5/-1     
IChatCompletion.cs
Refactor streaming completion interface                                   
+2/-3     
HubObserveData.cs
Create hub observation data model                                               
+7/-0     
ObserveDataBase.cs
Create base observation data class                                             
+6/-0     
IRoutingService.cs
Add streaming support to agent invocation                               
+1/-5     
RealtimeTextStream.cs
Refactor and enhance text streaming                                           
+11/-4   
MessageHub.cs
Implement reactive message hub                                                     
+44/-0   
InstructExecutor.cs
Add streaming state handling                                                         
+3/-1     
RoutingService.InvokeAgent.cs
Implement streaming agent invocation                                         
+15/-5   
RoutingService.cs
Add streaming support to routing                                                 
+3/-1     
ChatHubConversationHook.cs
Add streaming awareness to hooks                                                 
+6/-1     
ChatHubObserver.cs
Implement SignalR streaming observer                                         
+163/-0 
ChatCompletionProvider.cs
Implement streaming chat completion                                           
+122/-14
Miscellaneous
3 files
IConversationService.cs
Update parameter name for clarity                                               
+1/-1     
GetWeatherFn.cs
Update completion stop behavior                                                   
+1/-1     
RealTimeCompletionProvider.cs
Update stream class references                                                     
+4/-3     
Configuration changes
2 files
ConversationPlugin.cs
Register message hub singleton service                                     
+3/-0     
ChatHubPlugin.cs
Register observer and configure hub                                           
+12/-1   
Formatting
2 files
ConversationController.cs
Add empty line formatting                                                               
+1/-0     
StreamingLogHook.cs
Improve JSON formatting in logs                                                   
+2/-2     
Tests
1 files
ChatCompletionTests.cs
Update streaming test implementation                                         
+8/-6     
Dependencies
2 files
Directory.Packages.props
Add System.Reactive dependency                                                     
+1/-0     
BotSharp.Abstraction.csproj
Add System.Reactive package reference                                       
+1/-0     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • @iceljc iceljc requested a review from Oceania2018 June 27, 2025 16:49
    @GGHansome
    Copy link

    Auto Review Result:

    Code Review Summary

    Change Summary: The recent changes introduce streaming capabilities to multiple components of the system, including the handling of chat completion, messaging, and observables. The changes aim to support asynchronous operations and improve the performance and responsiveness of the system by utilizing streaming.

    Identified Issues

    Issue 1: Logic Error in Streaming Handling

    • Description: The logic implemented to determine when to handle messages in streamed mode can be optimized. For example, IsStreaming property is set and checked at various stages, but may cause unintended behavior if not cohesive across modules.
    • Suggestion: Ensure that the IsStreaming property is coherently managed and defaults are properly set. Consolidate streaming logic into a singular place where its status is managed.
      // Before
      message.IsStreaming = response.IsStreaming;
      
      // After
      if (useStream) {
        message.IsStreaming = true;
      }

    Issue 2: Unnecessary Logging

    • Description: There are logging messages left in the production code labeled with \\#if DEBUG. These should use proper logging strategies to ensure they don't interfere with performance or data privacy.
    • Suggestion: Remove or re-implement logging using a configurable logger to allow enabling or disabling based on the environment.
      // Before
      _logger.LogCritical($"Content update: {text}");
      
      // After
      if (_logger.IsEnabled(LogLevel.Debug)) {
        _logger.LogDebug($"Content update: {text}");
      }

    Issue 3: Use of Magic Strings

    • Description: The code contains several instances of repeated string literals for event names and similar constants, which can lead to errors due to typos or changes in data.
    • Suggestion: Define these strings as constant variables or use an enumeration to manage these strings.
      // Before
      if (value.EventName == "BeforeReceiveLlmStreamMessage")
      
      // After
      private const string BEFORE_RECEIVE_LLM_STREAM_MESSAGE = "BeforeReceiveLlmStreamMessage";
      if (value.EventName == BEFORE_RECEIVE_LLM_STREAM_MESSAGE) { ... }

    Overall Evaluation

    The code effectively incorporates streaming into its architecture, leveraging observable patterns for real-time updates. However, this also introduces complexity that needs careful management, especially around concurrency and state management. Future improvements could focus on modularizing streaming controls and creating stronger testing scenarios for edge cases in asynchronous processing.

    Copy link

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Possible Issue

    The streaming method returns a response message before the streaming is complete, potentially causing race conditions. The method also uses ConfigureAwait(false).GetAwaiter().GetResult() pattern which can cause deadlocks in certain contexts.

        public async Task<RoleDialogModel> GetChatCompletionsStreamingAsync(Agent agent, List<RoleDialogModel> conversations)
        {
            var client = ProviderHelper.GetClient(Provider, _model, _services);
            var chatClient = client.GetChatClient(_model);
            var (prompt, messages, options) = PrepareOptions(agent, conversations);
    
            var hub = _services.GetRequiredService<MessageHub>();
            var messageId = conversations.LastOrDefault()?.MessageId ?? string.Empty;
    
            var contentHooks = _services.GetHooks<IContentGeneratingHook>(agent.Id);
            // Before chat completion hook
            foreach (var hook in contentHooks)
            {
                await hook.BeforeGenerating(agent, conversations);
            }
    
            hub.Push(new()
            {
                ServiceProvider = _services,
                EventName = "BeforeReceiveLlmStreamMessage",
                Data = new RoleDialogModel(AgentRole.Assistant, string.Empty)
                {
                    CurrentAgentId = agent.Id,
                    MessageId = messageId
                }
            });
    
    
            using var textStream = new RealtimeTextStream();
            var toolCalls = new List<StreamingChatToolCallUpdate>();
            ChatTokenUsage? tokenUsage = null;
    
            var responseMessage = new RoleDialogModel(AgentRole.Assistant, string.Empty)
            {
                CurrentAgentId = agent.Id,
                MessageId = messageId
            };
    
            await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options))
            {
                tokenUsage = choice.Usage;
    
                if (!choice.ToolCallUpdates.IsNullOrEmpty())
                {
                    toolCalls.AddRange(choice.ToolCallUpdates);
                }
    
                if (!choice.ContentUpdate.IsNullOrEmpty())
                {
                    var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
                    textStream.Collect(text);
    
    #if DEBUG
                    _logger.LogCritical($"Content update: {text}");
    #endif
    
                    var content = new RoleDialogModel(AgentRole.Assistant, text)
                    {
                        CurrentAgentId = agent.Id,
                        MessageId = messageId
                    };
                    hub.Push(new()
                    {
                        ServiceProvider = _services,
                        EventName = "OnReceiveLlmStreamMessage",
                        Data = content
                    });
                }
    
                if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
                {
                    var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
                    var functionName = meta?.FunctionName;
                    var toolCallId = meta?.ToolCallId;
                    var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
                    var functionArgument = string.Join(string.Empty, args);
    
    #if DEBUG
                    _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})");
    #endif
    
                    responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
                    {
                        CurrentAgentId = agent.Id,
                        MessageId = messageId,
                        ToolCallId = toolCallId,
                        FunctionName = functionName,
                        FunctionArgs = functionArgument
                    };
    
                }
                else if (choice.FinishReason.HasValue)
                {
                    var allText = textStream.GetText();
                    _logger.LogCritical($"Text Content: {allText}");
    
                    responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
                    {
                        CurrentAgentId = agent.Id,
                        MessageId = messageId,
                        IsStreaming = true
                    };
                }
            }
    
            hub.Push(new()
            {
                ServiceProvider = _services,
                EventName = "AfterReceiveLlmStreamMessage",
                Data = responseMessage
            });
    
    
            var inputTokenDetails = tokenUsage?.InputTokenDetails;
            // After chat completion hook
            foreach (var hook in contentHooks)
            {
                await hook.AfterGenerated(responseMessage, new TokenStatsModel
                {
                    Prompt = prompt,
                    Provider = Provider,
                    Model = _model,
                    TextInputTokens = (tokenUsage?.InputTokenCount ?? 0) - (inputTokenDetails?.CachedTokenCount ?? 0),
                    CachedTextInputTokens = inputTokenDetails?.CachedTokenCount ?? 0,
                    TextOutputTokens = tokenUsage?.OutputTokenCount ?? 0
                });
            }
    
            return responseMessage;
        }
    Performance Issue

    The observer uses blocking synchronous calls with ConfigureAwait(false).GetAwaiter().GetResult() pattern in async context, which can lead to thread pool starvation and deadlocks.

        GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult();
    }
    else if (value.EventName == AFTER_RECEIVE_LLM_STREAM_MESSAGE && message.IsStreaming)
    {
        var conv = _services.GetRequiredService<IConversationService>();
        model = new ChatResponseDto()
        {
            ConversationId = conv.ConversationId,
            MessageId = message.MessageId,
            Text = message.Content,
            Sender = new()
            {
                FirstName = "AI",
                LastName = "Assistant",
                Role = AgentRole.Assistant
            }
        };
    
        var action = new ConversationSenderActionModel
        {
            ConversationId = conv.ConversationId,
            SenderAction = SenderActionEnum.TypingOff
        };
    
        GenerateSenderAction(conv.ConversationId, action).ConfigureAwait(false).GetAwaiter().GetResult();
    Interface Design

    The new streaming method has a default implementation that returns an empty response, which may not be appropriate for all implementations and could lead to unexpected behavior.

    Task<RoleDialogModel> GetChatCompletionsStreamingAsync(Agent agent, 
        List<RoleDialogModel> conversations) => Task.FromResult(new RoleDialogModel(AgentRole.Assistant, string.Empty));

    Copy link

    qodo-merge-pro bot commented Jun 27, 2025

    PR Code Suggestions ✨

    No code suggestions found for the PR.

    @GGHansome
    Copy link

    Auto Review Result:

    Code Review Summary

    Change Overview: The code introduces observable and logging functionalities in the BotSharp project to handle streaming responses from AI models. This change allows the system to manage and log events more efficiently, especially for AI dialogue management.

    Identified Issues

    Issue 1: [Code Readability]

    • Description: Certain aspects of the code could benefit from improved documentation and clearer naming conventions to enhance readability.
    • Suggestion: Add XML comments and ensure variable names are self-explanatory to make the code more accessible to new developers.
    • Example:
      // Suggested improvement
      /// <summary>
      /// Processes AI streaming response events
      /// </summary>
      public class StreamingObserver

    Issue 2: [Error Handling]

    • Description: The code lacks sufficient error handling around observable streams and AI response collection, which may cause unhandled exceptions if certain errors occur.
    • Suggestion: Implement try-catch blocks around critical external API calls and stream processing logic, and log any errors captured.
    • Example:
      try {
          // Process stream
      } catch (Exception ex) {
          _logger.LogError(ex, "Error processing stream");
      }

    Overall Evaluation

    The code changes bring significant improvements in the architecture for managing streaming interactions and logging out of streaming responses in the BotSharp project. However, improvements in code documentation and error handling mechanisms are essential to enhance maintainability and robustness.

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants