diff --git a/promptshell/ai_terminal_assistant.py b/promptshell/ai_terminal_assistant.py index ee381be..810a983 100644 --- a/promptshell/ai_terminal_assistant.py +++ b/promptshell/ai_terminal_assistant.py @@ -5,12 +5,16 @@ import sys import platform import questionary +import re from typing import Tuple from .node import Node from .data_gatherer import DataGatherer -from .format_utils import format_text, reset_format, get_current_os, get_os_specific_examples +from .format_utils import format_text, reset_format from .system_info import get_system_info +def get_current_os(): + return platform.system() + class AITerminalAssistant: def __init__(self, model_name: str, max_tokens: int = 8000, config: dict = None): self.username = getpass.getuser() @@ -202,62 +206,84 @@ def execute_interactive_command(self, command: str) -> Tuple[str, str, int]: def execute_command(self, user_input: str) -> str: try: + command = None self.current_directory = os.getcwd() + if user_input.strip() == "": return "Please provide a valid input." + + # Handle special commands (?, clear, !) if user_input.startswith('?') or user_input.endswith('?'): return self.answer_question(user_input) - if user_input.lower().strip() == 'clear' or user_input.lower().strip() == 'cls': - if get_current_os() == 'windows': - os.system('cls') - else: - os.system('clear') + if user_input.lower().strip() in ['clear', 'cls']: + os.system('cls' if get_current_os() == 'windows' else 'clear') return "" if user_input.startswith('!'): return self.run_direct_command(user_input[1:]) - additional_data = self.gather_additional_data(user_input) + + # Get command from executor command = self.command_executor(f""" User Input: {user_input} - Current OS: {get_current_os()} - Current OS specific examples: {get_os_specific_examples()} + Current OS: {platform.system()} Current Directory: {self.current_directory} - Translate the user input into a SINGLE shell command according to the operating system. + Translate the user input into a SINGLE shell command. Return ONLY the command, nothing else. - If the input is already a valid shell command, return it as is. - Do not provide any explanations or comments. - Use the actual filenames and content provided in the additional data. - """, additional_data=additional_data).strip() + """, additional_data=self.gather_additional_data(user_input)).strip() + + # Two-step verification for dangerous commands + if self.is_dangerous_command(command): + print(format_text('yellow', bold=True) + + f"\n⚠️ Warning: The command '{command}' may be destructive." + + reset_format()) + + # First confirmation + if not questionary.confirm("Do you want to proceed?").ask(): + return format_text('red', bold=True) + "\nCommand aborted by user." + reset_format() + + # Second confirmation - manual reentry + confirm_input = questionary.text("For safety, please re-type the exact command:").ask() + if confirm_input.strip() != command.strip(): + return format_text('red', bold=True) + "\n❌ Command mismatch. Operation aborted!" + reset_format() + + return self._execute_validated_command(command) - choice = questionary.confirm(f"Do you want to run the command '{command}'?").ask() - if choice: - if command.startswith("CONFIRM:"): - confirmation = questionary.confirm(f"Warning: This command may be destructive. Are you sure you want to run '{command[8:]}'?").ask() - if not confirmation: - return format_text('red') + "Command execution aborted." + reset_format() - command = command[8:] - formatted_command = format_text('cyan') + f"Command: {command}" + reset_format() - print(formatted_command) - self.command_history.append(command) - if len(self.command_history) > 10: - self.command_history.pop(0) - if command.startswith("cd "): - path = command.split(" ", 1)[1] - os.chdir(os.path.expanduser(path)) - result = f"Changed directory to {os.getcwd()}" - exit_code = 0 - else: - stdout, stderr, exit_code = self.execute_command_with_live_output(command) - result = "" - if exit_code != 0: - debug_suggestion = self.debug_error(command, stderr, exit_code) - result += format_text('yellow') + f"\n\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() - return result.strip() - else: - print(format_text('red') + "Command cancelled!" + reset_format()) - return "" except Exception as e: - print(format_text('red') + "Error in execute command" + reset_format()) - return self.handle_error(str(e), user_input, command) + print(format_text('red') + f"Error: {str(e)}" + reset_format()) + return format_text('red') + "Command execution failed." + reset_format() + + def handle_error(self, error: str, user_input: str, command: str) -> str: + try: + error_analysis = self.error_handler(f""" + Error: {error} + User Input: {user_input} + Interpreted Command: {command} + Current Directory: {self.current_directory} + Provide ONLY a single, simple corrected command. No explanations. + """) + + print(format_text('red') + f"Error occurred: {error}" + reset_format()) + print(format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format()) + + if questionary.confirm("Would you like to execute the suggested command?").ask(): + return self._execute_validated_command(error_analysis) + return format_text('red') + "Command execution aborted." + reset_format() + + except Exception as e: + return format_text('red') + f"Error handler failed: {str(e)}" + reset_format() + + def _execute_validated_command(self, command: str) -> str: + """Helper method to execute validated commands""" + if command.startswith("cd "): + path = command.split(" ", 1)[1] + os.chdir(os.path.expanduser(path)) + return f"Changed directory to {os.getcwd()}" + + stdout, stderr, exit_code = self.execute_command_with_live_output(command) + if exit_code != 0 and stderr: + debug_suggestion = self.debug_error(command, stderr, exit_code) + return format_text('yellow') + f"\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() + + return stdout.strip() if stdout else "" def run_direct_command(self, command: str) -> str: try: @@ -334,19 +360,22 @@ def debug_error(self, command: str, error_output: str, exit_code: int) -> str: """ return self.debugger(debug_input) - def handle_error(self, error: str, user_input: str, command: str) -> str: - error_analysis = self.error_handler(f""" - Error: {error} - User Input: {user_input} - Interpreted Command: {command} - Current Directory: {self.current_directory} - Provide ONLY a single, simple corrected command. No explanations. - """) - error_msg = format_text('red') + f"Error occurred: {error}" + reset_format() - suggestion_msg = format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format() - print(error_msg) - print(suggestion_msg) - confirmation = questionary.confirm("Would you like to execute the suggested command?").ask() - if confirmation: - return self.execute_command(error_analysis) - return format_text('red') + "Command execution aborted." + reset_format() \ No newline at end of file + def is_dangerous_command(self, command: str) -> bool: + dangerous_patterns = [ + (r"(?:^|\s)(rm|del|delete)\s+-[rf]*\s+", "Recursive/forced deletion"), + (r"(?:^|\s)(rm|del|delete)\s+.*[\*\?]", "Wildcard deletion"), + (r"(?:^|\s)(chmod|chown)\s+-R\s+", "Recursive permission change"), + (r"(?:^|\s)(dd|format|mkfs)\s+", "Disk operations"), + (r"(?:^|\s)(shutdown|reboot|init\s+[06])\s*", "System control"), + (r"(?:^|\s)>(>?)\s+", "Output redirection"), + ] + + cmd_lower = command.lower().strip() + return any( + cmd_lower.startswith(kw) for kw in [ + "rm ", "del ", "delete ", "chmod ", "chown ", + "mkfs.", "dd ", "reboot", "shutdown", "init 0", "init 6" + ] + ) or any( + bool(re.search(pattern, cmd_lower)) for pattern, _ in dangerous_patterns + ) \ No newline at end of file diff --git a/promptshell/main.py b/promptshell/main.py index a8dfb90..d85dbe7 100644 --- a/promptshell/main.py +++ b/promptshell/main.py @@ -53,6 +53,13 @@ def main(): - Type 'clear' to clear the terminal.{reset_format()}""") continue + # ✅ Handle alias command + if user_input.lower().startswith("alias "): + result = handle_alias_command(user_input, assistant.alias_manager) + print(result) + continue + + # All other commands (including dangerous ones) are handled by the assistant result = assistant.execute_command(user_input) print(result)