Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 256 additions & 41 deletions scripts/gh_copilot_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,93 @@ async def _ensure_copilot_token(self) -> bool:
return False
return True

async def send_message(self, message: str) -> str:
async def _handle_tool_calls(
self, tool_calls: List[Dict], available_tools: List[Dict]
) -> List[Dict]:
"""
Handle tool calls from the model

Args:
tool_calls: List of tool calls from the model
available_tools: List of available tool definitions

Returns:
List of tool response messages
"""
tool_responses = []

for tool_call in tool_calls:
try:
function_name = tool_call["function"]["name"]

# Handle potentially incomplete arguments
args_str = tool_call["function"]["arguments"]
if not args_str or args_str.strip() == "":
function_args = {}
else:
try:
function_args = json.loads(args_str)
except json.JSONDecodeError:
print(f"Warning: Failed to parse arguments for {function_name}: {args_str}")
function_args = {}

# Execute the echo tool
if function_name == "echo":
result = self._execute_echo_tool(function_args)
tool_responses.append(
{
"role": "tool",
"content": result,
"tool_call_id": tool_call.get("id"),
}
)
else:
# Unknown tool
tool_responses.append(
{
"role": "tool",
"content": f"Error: Unknown tool '{function_name}'",
"tool_call_id": tool_call.get("id"),
}
)

except Exception as e:
print(f"Error handling tool call: {e}")
tool_responses.append(
{
"role": "tool",
"content": f"Error executing tool: {str(e)}",
"tool_call_id": tool_call.get("id", "unknown"),
}
)

return tool_responses

def _execute_echo_tool(self, args: Dict) -> str:
"""
Simple echo tool that wraps input in tags

Args:
args: Dictionary with 'input' and optional 'tag' fields

Returns:
Input wrapped in specified tags
"""
input_text = args.get("input", "")
tag = args.get("tag", "echo")

result = f"<{tag}>{input_text}</{tag}>"
print(f"🔧 Echo tool executed: input='{input_text}', tag='{tag}' -> '{result}'")

return result

async def send_message(self, message: str, tools: Optional[List[Dict]] = None) -> str:
"""
Send a message to GitHub Copilot and get response

Args:
message: User message to send to the chat
tools: Optional list of tools to make available to the model

Returns:
Assistant's response as a string
Expand All @@ -458,55 +539,82 @@ async def send_message(self, message: str) -> str:
# Build headers using token manager's config
editor_info = self.token_manager.get_editor_info()

# Build request payload
request_data = {
"intent": False,
"model": self.model,
"temperature": 0,
"top_p": 1,
"n": 1,
"stream": False, # Use non-streaming for simpler JSON parsing
"messages": self.chat_messages,
}

# Add tools if provided
if tools:
request_data["tools"] = tools
request_data["tool_choice"] = "auto"

resp = requests.post(
"https://api.githubcopilot.com/chat/completions",
headers={
"authorization": f"Bearer {self._current_copilot_token}",
"Editor-Version": editor_info.format(),
"Content-Type": "application/json",
"Accept": "text/event-stream",
},
json={
"intent": False,
"model": self.model,
"temperature": 0,
"top_p": 1,
"n": 1,
"stream": True,
"messages": self.chat_messages,
"Accept": "application/json", # Request JSON response
},
json=request_data,
)
except requests.exceptions.ConnectionError:
return "Error: Connection failed"

result = ""

# Parse the response text, splitting it by newlines
resp_text = resp.text.split("\n")
for line in resp_text:
# If the line contains a completion, process it
if line.startswith("data: {"):
try:
# Parse the completion from the line as json
json_completion = json.loads(line[6:])
choices = json_completion.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
completion = delta.get("content")
if completion:
result += completion
else:
result += "\n"
except (json.JSONDecodeError, KeyError, IndexError):
pass

self.chat_messages.append({"content": result, "role": "assistant"})

if result == "":
# Check for HTTP errors
if not resp.ok:
print(f"Chat error - Status code: {resp.status_code}")
print(f"Response: {resp.text}")
return f"Error: No response received (Status: {resp.status_code})"
return f"Error: HTTP {resp.status_code}"

# Parse JSON response
try:
response_json = resp.json()
except json.JSONDecodeError:
print(f"Error: Failed to parse JSON response")
print(f"Response: {resp.text}")
return "Error: Invalid JSON response"

# Extract the response data
choices = response_json.get("choices", [])
if not choices:
return "Error: No choices in response"

choice = choices[0]
message = choice.get("message", {})
result = message.get("content", "")
tool_calls = message.get("tool_calls", [])

# If tool calls were made, handle them
if tool_calls and tools:
print(f"\n🔧 Tool calls detected: {len(tool_calls)} tool(s)")

# Add the assistant message with tool calls first
self.chat_messages.append({
"content": result or "",
"role": "assistant",
"tool_calls": tool_calls
})

tool_results = await self._handle_tool_calls(tool_calls, tools)
print(f"🔧 Tool execution completed, sending results back to LLM...")

# Add tool response to messages
for tool_result in tool_results:
self.chat_messages.append(tool_result)

# Send another request with tool results
return await self.send_message("", tools)

# Add regular assistant message
self.chat_messages.append({"content": result, "role": "assistant"})
return result

def clear_history(self):
Expand All @@ -525,15 +633,27 @@ def show_history(self):
for i, msg in enumerate(self.chat_messages):
role = msg["role"].capitalize()
content = msg["content"]
print(f"{i+1}. {role}: {content}")

# Show tool calls if present
if "tool_calls" in msg:
tool_calls = msg["tool_calls"]
print(f"{i+1}. {role}: {content}")
for tc in tool_calls:
func_name = tc["function"]["name"]
func_args = tc["function"]["arguments"]
print(f" 🔧 Tool Call: {func_name}({func_args})")
else:
print(f"{i+1}. {role}: {content}")
print("-" * 50)

async def start_interactive_session(self):
async def start_interactive_session(self, tools: Optional[List[Dict]] = None):
"""Start an interactive chat session"""
print(f"Starting interactive chat with GitHub Copilot ({self.model})")
print("Type 'quit', 'exit', or 'bye' to end the session")
print("Type 'clear' to clear chat history")
print("Type 'history' to show chat history")
if tools:
print(f"Available tools: {', '.join([t['function']['name'] for t in tools])}")
print("-" * 50)

while True:
Expand All @@ -553,7 +673,7 @@ async def start_interactive_session(self):
continue

print("Copilot: ", end="", flush=True)
response = await self.send_message(user_input)
response = await self.send_message(user_input, tools)
print(response)

except KeyboardInterrupt:
Expand Down Expand Up @@ -717,6 +837,87 @@ def interactive_chat():
GitHubTokenClient = GitHubTokenManager


def get_echo_tool_definition() -> Dict:
"""
Get the echo tool definition in OpenAI format

Returns:
Tool definition dictionary
"""
return {
"type": "function",
"function": {
"name": "echo",
"description": "Echo the input text wrapped in custom XML tags",
"parameters": {
"type": "object",
"properties": {
"input": {
"type": "string",
"description": "The text to echo back",
},
"tag": {
"type": "string",
"description": "The XML tag name to wrap the text in (default: 'echo')",
},
},
"required": ["input"],
},
},
}


async def demo_tool_usage():
"""
Demonstrate tool usage with the echo tool

This example shows how to:
1. Define a tool (echo tool that wraps text in tags)
2. Send a message that triggers tool usage
3. Handle the tool call and response
"""
print("=" * 60)
print("Tool Usage Demo: Echo Tool")
print("=" * 60)

# Get GitHub token
github_token = GitHubTokenManager.get_github_token(use_device_flow=True)
if not github_token:
print("Failed to get GitHub token")
return

# Create config and managers
config = GitHubConfig(token=github_token)
token_manager = GitHubTokenManager(config)
chat_orchestrator = CopilotChatOrchestrator(token_manager)

# Define the echo tool
tools = [get_echo_tool_definition()]

print("\nTool Definition:")
print(json.dumps(tools[0], indent=2))

# Example messages that would trigger tool usage
test_messages = [
"Please use the echo tool to wrap 'Hello World' in a 'greeting' tag",
"Echo the text 'Testing 123' using the echo tool",
"Use the echo tool with input 'MCP Tool Example' and tag 'example'",
]

print("\n" + "=" * 60)
print("Testing tool usage with sample messages:")
print("=" * 60)

for message in test_messages:
print(f"\nUser: {message}")
response = await chat_orchestrator.send_message(message, tools)
print(f"Assistant: {response}")

print("\n" + "=" * 60)
print("Demo completed!")
print("=" * 60)


async def main():
"""Main function"""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -755,6 +956,11 @@ async def main():
parser.add_argument(
"--model", default="gpt-4o", help="Model to use for chat (default: gpt-4o)"
)
parser.add_argument(
"--tool-demo",
action="store_true",
help="Run the tool usage demo with echo tool example",
)
parser.add_argument("--editor-name", default="vscode", help="Editor name")
parser.add_argument("--editor-version", default="1.63.2", help="Editor version")
parser.add_argument("--plugin-name", default="copilot-chat", help="Plugin name")
Expand All @@ -766,6 +972,11 @@ async def main():
global MODEL
MODEL = args.model

# Handle tool demo
if args.tool_demo:
await demo_tool_usage()
return

# Determine device flow usage
if args.device_flow:
use_device_flow = True
Expand Down Expand Up @@ -805,13 +1016,17 @@ async def main():
# Handle chat functionality
if args.chat:
chat_orchestrator = CopilotChatOrchestrator(token_manager, args.model)
await chat_orchestrator.start_interactive_session()
# Include echo tool by default for interactive chat
tools = [get_echo_tool_definition()]
await chat_orchestrator.start_interactive_session(tools)
return

if args.message:
print(f"Sending message to {args.model}...")
chat_orchestrator = CopilotChatOrchestrator(token_manager, args.model)
response = await chat_orchestrator.send_message(args.message)
# Include echo tool by default for single messages
tools = [get_echo_tool_definition()]
response = await chat_orchestrator.send_message(args.message, tools)
print(f"Response: {response}")
return

Expand Down
Loading