-
Notifications
You must be signed in to change notification settings - Fork 21
Feature/improved dangerous command safeguard #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,12 +5,16 @@ | |
| import sys | ||
| import platform | ||
| import questionary | ||
| import re | ||
| from typing import Tuple | ||
| from .node import Node | ||
| from .data_gatherer import DataGatherer | ||
| from .format_utils import format_text, reset_format, get_current_os, get_os_specific_examples | ||
| from .format_utils import format_text, reset_format | ||
| from .system_info import get_system_info | ||
|
|
||
| def get_current_os(): | ||
| return platform.system() | ||
|
|
||
|
Comment on lines
+15
to
+17
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is already defined in format_utils.py. Remove this |
||
| class AITerminalAssistant: | ||
| def __init__(self, model_name: str, max_tokens: int = 8000, config: dict = None): | ||
| self.username = getpass.getuser() | ||
|
|
@@ -202,62 +206,84 @@ def execute_interactive_command(self, command: str) -> Tuple[str, str, int]: | |
|
|
||
| def execute_command(self, user_input: str) -> str: | ||
| try: | ||
| command = None | ||
| self.current_directory = os.getcwd() | ||
|
|
||
| if user_input.strip() == "": | ||
| return "Please provide a valid input." | ||
|
|
||
| # Handle special commands (?, clear, !) | ||
| if user_input.startswith('?') or user_input.endswith('?'): | ||
| return self.answer_question(user_input) | ||
| if user_input.lower().strip() == 'clear' or user_input.lower().strip() == 'cls': | ||
| if get_current_os() == 'windows': | ||
| os.system('cls') | ||
| else: | ||
| os.system('clear') | ||
| if user_input.lower().strip() in ['clear', 'cls']: | ||
| os.system('cls' if get_current_os() == 'windows' else 'clear') | ||
| return "" | ||
| if user_input.startswith('!'): | ||
| return self.run_direct_command(user_input[1:]) | ||
| additional_data = self.gather_additional_data(user_input) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep this line, it's a required arguement in command_executor |
||
|
|
||
| # Get command from executor | ||
| command = self.command_executor(f""" | ||
| User Input: {user_input} | ||
| Current OS: {get_current_os()} | ||
| Current OS specific examples: {get_os_specific_examples()} | ||
| Current OS: {platform.system()} | ||
|
Comment on lines
-221
to
+227
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't change these, the functions are well-defined in format_utils and are required. |
||
| Current Directory: {self.current_directory} | ||
| Translate the user input into a SINGLE shell command according to the operating system. | ||
| Translate the user input into a SINGLE shell command. | ||
|
Comment on lines
-224
to
+229
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No changes required here as well. keep the original line. |
||
| Return ONLY the command, nothing else. | ||
| If the input is already a valid shell command, return it as is. | ||
| Do not provide any explanations or comments. | ||
| Use the actual filenames and content provided in the additional data. | ||
| """, additional_data=additional_data).strip() | ||
| """, additional_data=self.gather_additional_data(user_input)).strip() | ||
|
Comment on lines
-229
to
+231
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the additional data variable defined above only |
||
|
|
||
| # Two-step verification for dangerous commands | ||
| if self.is_dangerous_command(command): | ||
| print(format_text('yellow', bold=True) + | ||
| f"\n⚠️ Warning: The command '{command}' may be destructive." + | ||
| reset_format()) | ||
|
Comment on lines
+234
to
+237
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't change the identification method of dangerous commads. |
||
|
|
||
| # First confirmation | ||
| if not questionary.confirm("Do you want to proceed?").ask(): | ||
| return format_text('red', bold=True) + "\nCommand aborted by user." + reset_format() | ||
|
|
||
| # Second confirmation - manual reentry | ||
| confirm_input = questionary.text("For safety, please re-type the exact command:").ask() | ||
| if confirm_input.strip() != command.strip(): | ||
| return format_text('red', bold=True) + "\n❌ Command mismatch. Operation aborted!" + reset_format() | ||
|
|
||
| return self._execute_validated_command(command) | ||
|
|
||
| choice = questionary.confirm(f"Do you want to run the command '{command}'?").ask() | ||
| if choice: | ||
| if command.startswith("CONFIRM:"): | ||
| confirmation = questionary.confirm(f"Warning: This command may be destructive. Are you sure you want to run '{command[8:]}'?").ask() | ||
| if not confirmation: | ||
| return format_text('red') + "Command execution aborted." + reset_format() | ||
| command = command[8:] | ||
| formatted_command = format_text('cyan') + f"Command: {command}" + reset_format() | ||
| print(formatted_command) | ||
| self.command_history.append(command) | ||
| if len(self.command_history) > 10: | ||
| self.command_history.pop(0) | ||
| if command.startswith("cd "): | ||
| path = command.split(" ", 1)[1] | ||
| os.chdir(os.path.expanduser(path)) | ||
| result = f"Changed directory to {os.getcwd()}" | ||
| exit_code = 0 | ||
| else: | ||
| stdout, stderr, exit_code = self.execute_command_with_live_output(command) | ||
| result = "" | ||
| if exit_code != 0: | ||
| debug_suggestion = self.debug_error(command, stderr, exit_code) | ||
| result += format_text('yellow') + f"\n\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() | ||
| return result.strip() | ||
| else: | ||
| print(format_text('red') + "Command cancelled!" + reset_format()) | ||
| return "" | ||
|
Comment on lines
-231
to
-257
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't change the logic here
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you so much for the clarification. I’ll get back to you as soon as possible with the necessary corrections. @Kirti-Rathi
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feel free to reach out in case you've doubts |
||
| except Exception as e: | ||
| print(format_text('red') + "Error in execute command" + reset_format()) | ||
| return self.handle_error(str(e), user_input, command) | ||
| print(format_text('red') + f"Error: {str(e)}" + reset_format()) | ||
| return format_text('red') + "Command execution failed." + reset_format() | ||
|
|
||
| def handle_error(self, error: str, user_input: str, command: str) -> str: | ||
| try: | ||
| error_analysis = self.error_handler(f""" | ||
| Error: {error} | ||
| User Input: {user_input} | ||
| Interpreted Command: {command} | ||
| Current Directory: {self.current_directory} | ||
| Provide ONLY a single, simple corrected command. No explanations. | ||
| """) | ||
|
|
||
| print(format_text('red') + f"Error occurred: {error}" + reset_format()) | ||
| print(format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format()) | ||
|
|
||
| if questionary.confirm("Would you like to execute the suggested command?").ask(): | ||
| return self._execute_validated_command(error_analysis) | ||
| return format_text('red') + "Command execution aborted." + reset_format() | ||
|
|
||
| except Exception as e: | ||
| return format_text('red') + f"Error handler failed: {str(e)}" + reset_format() | ||
|
|
||
| def _execute_validated_command(self, command: str) -> str: | ||
| """Helper method to execute validated commands""" | ||
| if command.startswith("cd "): | ||
| path = command.split(" ", 1)[1] | ||
| os.chdir(os.path.expanduser(path)) | ||
| return f"Changed directory to {os.getcwd()}" | ||
|
|
||
| stdout, stderr, exit_code = self.execute_command_with_live_output(command) | ||
| if exit_code != 0 and stderr: | ||
| debug_suggestion = self.debug_error(command, stderr, exit_code) | ||
| return format_text('yellow') + f"\nDebugging Suggestion:\n{debug_suggestion}" + reset_format() | ||
|
|
||
| return stdout.strip() if stdout else "" | ||
|
|
||
| def run_direct_command(self, command: str) -> str: | ||
| try: | ||
|
|
@@ -334,19 +360,22 @@ def debug_error(self, command: str, error_output: str, exit_code: int) -> str: | |
| """ | ||
| return self.debugger(debug_input) | ||
|
|
||
| def handle_error(self, error: str, user_input: str, command: str) -> str: | ||
| error_analysis = self.error_handler(f""" | ||
| Error: {error} | ||
| User Input: {user_input} | ||
| Interpreted Command: {command} | ||
| Current Directory: {self.current_directory} | ||
| Provide ONLY a single, simple corrected command. No explanations. | ||
| """) | ||
| error_msg = format_text('red') + f"Error occurred: {error}" + reset_format() | ||
| suggestion_msg = format_text('yellow') + f"Suggested command: {error_analysis}" + reset_format() | ||
| print(error_msg) | ||
| print(suggestion_msg) | ||
| confirmation = questionary.confirm("Would you like to execute the suggested command?").ask() | ||
| if confirmation: | ||
| return self.execute_command(error_analysis) | ||
| return format_text('red') + "Command execution aborted." + reset_format() | ||
| def is_dangerous_command(self, command: str) -> bool: | ||
| dangerous_patterns = [ | ||
| (r"(?:^|\s)(rm|del|delete)\s+-[rf]*\s+", "Recursive/forced deletion"), | ||
| (r"(?:^|\s)(rm|del|delete)\s+.*[\*\?]", "Wildcard deletion"), | ||
| (r"(?:^|\s)(chmod|chown)\s+-R\s+", "Recursive permission change"), | ||
| (r"(?:^|\s)(dd|format|mkfs)\s+", "Disk operations"), | ||
| (r"(?:^|\s)(shutdown|reboot|init\s+[06])\s*", "System control"), | ||
| (r"(?:^|\s)>(>?)\s+", "Output redirection"), | ||
| ] | ||
|
|
||
| cmd_lower = command.lower().strip() | ||
| return any( | ||
| cmd_lower.startswith(kw) for kw in [ | ||
| "rm ", "del ", "delete ", "chmod ", "chown ", | ||
| "mkfs.", "dd ", "reboot", "shutdown", "init 0", "init 6" | ||
| ] | ||
| ) or any( | ||
| bool(re.search(pattern, cmd_lower)) for pattern, _ in dangerous_patterns | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep it as it was, importing functions from format_utils only