diff --git a/scripts/gh_copilot_chat.py b/scripts/gh_copilot_chat.py index a938cc6..94996cf 100755 --- a/scripts/gh_copilot_chat.py +++ b/scripts/gh_copilot_chat.py @@ -438,12 +438,93 @@ async def _ensure_copilot_token(self) -> bool: return False return True - async def send_message(self, message: str) -> str: + async def _handle_tool_calls( + self, tool_calls: List[Dict], available_tools: List[Dict] + ) -> List[Dict]: + """ + Handle tool calls from the model + + Args: + tool_calls: List of tool calls from the model + available_tools: List of available tool definitions + + Returns: + List of tool response messages + """ + tool_responses = [] + + for tool_call in tool_calls: + try: + function_name = tool_call["function"]["name"] + + # Handle potentially incomplete arguments + args_str = tool_call["function"]["arguments"] + if not args_str or args_str.strip() == "": + function_args = {} + else: + try: + function_args = json.loads(args_str) + except json.JSONDecodeError: + print(f"Warning: Failed to parse arguments for {function_name}: {args_str}") + function_args = {} + + # Execute the echo tool + if function_name == "echo": + result = self._execute_echo_tool(function_args) + tool_responses.append( + { + "role": "tool", + "content": result, + "tool_call_id": tool_call.get("id"), + } + ) + else: + # Unknown tool + tool_responses.append( + { + "role": "tool", + "content": f"Error: Unknown tool '{function_name}'", + "tool_call_id": tool_call.get("id"), + } + ) + + except Exception as e: + print(f"Error handling tool call: {e}") + tool_responses.append( + { + "role": "tool", + "content": f"Error executing tool: {str(e)}", + "tool_call_id": tool_call.get("id", "unknown"), + } + ) + + return tool_responses + + def _execute_echo_tool(self, args: Dict) -> str: + """ + Simple echo tool that wraps input in tags + + Args: + args: Dictionary with 'input' and optional 'tag' fields + + Returns: + Input wrapped in specified tags + """ + input_text = args.get("input", "") + tag = args.get("tag", "echo") + + result = f"<{tag}>{input_text}" + print(f"šŸ”§ Echo tool executed: input='{input_text}', tag='{tag}' -> '{result}'") + + return result + + async def send_message(self, message: str, tools: Optional[List[Dict]] = None) -> str: """ Send a message to GitHub Copilot and get response Args: message: User message to send to the chat + tools: Optional list of tools to make available to the model Returns: Assistant's response as a string @@ -458,55 +539,82 @@ async def send_message(self, message: str) -> str: # Build headers using token manager's config editor_info = self.token_manager.get_editor_info() + # Build request payload + request_data = { + "intent": False, + "model": self.model, + "temperature": 0, + "top_p": 1, + "n": 1, + "stream": False, # Use non-streaming for simpler JSON parsing + "messages": self.chat_messages, + } + + # Add tools if provided + if tools: + request_data["tools"] = tools + request_data["tool_choice"] = "auto" + resp = requests.post( "https://api.githubcopilot.com/chat/completions", headers={ "authorization": f"Bearer {self._current_copilot_token}", "Editor-Version": editor_info.format(), "Content-Type": "application/json", - "Accept": "text/event-stream", - }, - json={ - "intent": False, - "model": self.model, - "temperature": 0, - "top_p": 1, - "n": 1, - "stream": True, - "messages": self.chat_messages, + "Accept": "application/json", # Request JSON response }, + json=request_data, ) except requests.exceptions.ConnectionError: return "Error: Connection failed" - result = "" - - # Parse the response text, splitting it by newlines - resp_text = resp.text.split("\n") - for line in resp_text: - # If the line contains a completion, process it - if line.startswith("data: {"): - try: - # Parse the completion from the line as json - json_completion = json.loads(line[6:]) - choices = json_completion.get("choices", []) - if choices: - delta = choices[0].get("delta", {}) - completion = delta.get("content") - if completion: - result += completion - else: - result += "\n" - except (json.JSONDecodeError, KeyError, IndexError): - pass - - self.chat_messages.append({"content": result, "role": "assistant"}) - - if result == "": + # Check for HTTP errors + if not resp.ok: print(f"Chat error - Status code: {resp.status_code}") print(f"Response: {resp.text}") - return f"Error: No response received (Status: {resp.status_code})" + return f"Error: HTTP {resp.status_code}" + + # Parse JSON response + try: + response_json = resp.json() + except json.JSONDecodeError: + print(f"Error: Failed to parse JSON response") + print(f"Response: {resp.text}") + return "Error: Invalid JSON response" + + # Extract the response data + choices = response_json.get("choices", []) + if not choices: + return "Error: No choices in response" + + choice = choices[0] + message = choice.get("message", {}) + result = message.get("content", "") + tool_calls = message.get("tool_calls", []) + + # If tool calls were made, handle them + if tool_calls and tools: + print(f"\nšŸ”§ Tool calls detected: {len(tool_calls)} tool(s)") + + # Add the assistant message with tool calls first + self.chat_messages.append({ + "content": result or "", + "role": "assistant", + "tool_calls": tool_calls + }) + + tool_results = await self._handle_tool_calls(tool_calls, tools) + print(f"šŸ”§ Tool execution completed, sending results back to LLM...") + + # Add tool response to messages + for tool_result in tool_results: + self.chat_messages.append(tool_result) + + # Send another request with tool results + return await self.send_message("", tools) + # Add regular assistant message + self.chat_messages.append({"content": result, "role": "assistant"}) return result def clear_history(self): @@ -525,15 +633,27 @@ def show_history(self): for i, msg in enumerate(self.chat_messages): role = msg["role"].capitalize() content = msg["content"] - print(f"{i+1}. {role}: {content}") + + # Show tool calls if present + if "tool_calls" in msg: + tool_calls = msg["tool_calls"] + print(f"{i+1}. {role}: {content}") + for tc in tool_calls: + func_name = tc["function"]["name"] + func_args = tc["function"]["arguments"] + print(f" šŸ”§ Tool Call: {func_name}({func_args})") + else: + print(f"{i+1}. {role}: {content}") print("-" * 50) - async def start_interactive_session(self): + async def start_interactive_session(self, tools: Optional[List[Dict]] = None): """Start an interactive chat session""" print(f"Starting interactive chat with GitHub Copilot ({self.model})") print("Type 'quit', 'exit', or 'bye' to end the session") print("Type 'clear' to clear chat history") print("Type 'history' to show chat history") + if tools: + print(f"Available tools: {', '.join([t['function']['name'] for t in tools])}") print("-" * 50) while True: @@ -553,7 +673,7 @@ async def start_interactive_session(self): continue print("Copilot: ", end="", flush=True) - response = await self.send_message(user_input) + response = await self.send_message(user_input, tools) print(response) except KeyboardInterrupt: @@ -717,6 +837,87 @@ def interactive_chat(): GitHubTokenClient = GitHubTokenManager +def get_echo_tool_definition() -> Dict: + """ + Get the echo tool definition in OpenAI format + + Returns: + Tool definition dictionary + """ + return { + "type": "function", + "function": { + "name": "echo", + "description": "Echo the input text wrapped in custom XML tags", + "parameters": { + "type": "object", + "properties": { + "input": { + "type": "string", + "description": "The text to echo back", + }, + "tag": { + "type": "string", + "description": "The XML tag name to wrap the text in (default: 'echo')", + }, + }, + "required": ["input"], + }, + }, + } + + +async def demo_tool_usage(): + """ + Demonstrate tool usage with the echo tool + + This example shows how to: + 1. Define a tool (echo tool that wraps text in tags) + 2. Send a message that triggers tool usage + 3. Handle the tool call and response + """ + print("=" * 60) + print("Tool Usage Demo: Echo Tool") + print("=" * 60) + + # Get GitHub token + github_token = GitHubTokenManager.get_github_token(use_device_flow=True) + if not github_token: + print("Failed to get GitHub token") + return + + # Create config and managers + config = GitHubConfig(token=github_token) + token_manager = GitHubTokenManager(config) + chat_orchestrator = CopilotChatOrchestrator(token_manager) + + # Define the echo tool + tools = [get_echo_tool_definition()] + + print("\nTool Definition:") + print(json.dumps(tools[0], indent=2)) + + # Example messages that would trigger tool usage + test_messages = [ + "Please use the echo tool to wrap 'Hello World' in a 'greeting' tag", + "Echo the text 'Testing 123' using the echo tool", + "Use the echo tool with input 'MCP Tool Example' and tag 'example'", + ] + + print("\n" + "=" * 60) + print("Testing tool usage with sample messages:") + print("=" * 60) + + for message in test_messages: + print(f"\nUser: {message}") + response = await chat_orchestrator.send_message(message, tools) + print(f"Assistant: {response}") + + print("\n" + "=" * 60) + print("Demo completed!") + print("=" * 60) + + async def main(): """Main function""" parser = argparse.ArgumentParser( @@ -755,6 +956,11 @@ async def main(): parser.add_argument( "--model", default="gpt-4o", help="Model to use for chat (default: gpt-4o)" ) + parser.add_argument( + "--tool-demo", + action="store_true", + help="Run the tool usage demo with echo tool example", + ) parser.add_argument("--editor-name", default="vscode", help="Editor name") parser.add_argument("--editor-version", default="1.63.2", help="Editor version") parser.add_argument("--plugin-name", default="copilot-chat", help="Plugin name") @@ -766,6 +972,11 @@ async def main(): global MODEL MODEL = args.model + # Handle tool demo + if args.tool_demo: + await demo_tool_usage() + return + # Determine device flow usage if args.device_flow: use_device_flow = True @@ -805,13 +1016,17 @@ async def main(): # Handle chat functionality if args.chat: chat_orchestrator = CopilotChatOrchestrator(token_manager, args.model) - await chat_orchestrator.start_interactive_session() + # Include echo tool by default for interactive chat + tools = [get_echo_tool_definition()] + await chat_orchestrator.start_interactive_session(tools) return if args.message: print(f"Sending message to {args.model}...") chat_orchestrator = CopilotChatOrchestrator(token_manager, args.model) - response = await chat_orchestrator.send_message(args.message) + # Include echo tool by default for single messages + tools = [get_echo_tool_definition()] + response = await chat_orchestrator.send_message(args.message, tools) print(f"Response: {response}") return