diff --git a/universal/bash_server.py b/universal/bash_server.py index e1877b186..545ca9bd8 100644 --- a/universal/bash_server.py +++ b/universal/bash_server.py @@ -136,6 +136,13 @@ def __init__(self, message): self.message = message +def validate_command_length(command: str) -> None: + """Validate command length to prevent issues with very long commands.""" + MAX_COMMAND_LENGTH = 100000 # 100KB limit + if len(command.encode('utf-8')) > MAX_COMMAND_LENGTH: + raise ToolError(f"Command too long ({len(command.encode('utf-8'))} bytes). Maximum allowed: {MAX_COMMAND_LENGTH} bytes") + + class BaseAnthropicTool(metaclass=ABCMeta): """Abstract base class for Anthropic-defined tools.""" @@ -246,7 +253,7 @@ async def start(self): stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=1024 * 1024, + limit=10 * 1024 * 1024, # Increased from 1MB to 10MB to handle large outputs ) self._started = True @@ -371,15 +378,61 @@ async def run(self, command: str, timeout: float | None = None): try: command_timeout = timeout if timeout is not None else self._timeout - data = await asyncio.wait_for( - self._process.stdout.readuntil(self._sentinel.encode()), - timeout=command_timeout - ) - output = data.decode(errors="replace").replace(self._sentinel, "") - self._partial_output = output - error = self._process.stderr._buffer.decode(errors="replace") - self._partial_error = error - self._is_running_command = False + + # Add timeout protection for the readuntil operation + try: + data = await asyncio.wait_for( + self._process.stdout.readuntil(self._sentinel.encode()), + timeout=command_timeout + ) + output = data.decode(errors="replace").replace(self._sentinel, "") + self._partial_output = output + error = self._process.stderr._buffer.decode(errors="replace") + self._partial_error = error + self._is_running_command = False + + except (asyncio.IncompleteReadError, ConnectionResetError) as e: + # Handle the "0 bytes read" error and similar stream issues + # This is the main fix for the reported error + output = self._partial_output + error = self._partial_error + + # Try to get any remaining data from buffers + if hasattr(self._process.stdout, '_buffer') and self._process.stdout._buffer: + try: + remaining = self._process.stdout._buffer.decode(errors="replace") + output += remaining + except Exception: + pass + + if hasattr(self._process.stderr, '_buffer') and self._process.stderr._buffer: + try: + remaining = self._process.stderr._buffer.decode(errors="replace") + error += remaining + except Exception: + pass + + # Check if the command actually completed successfully despite the stream error + if self._sentinel in output: + # Command completed, just had a stream reading issue + output = output.replace(self._sentinel, "") + self._is_running_command = False + filtered_error = self._filter_error_output(error) + return CLIResult( + output=output.rstrip('\n'), + error=filtered_error, + system=f"Command completed despite stream reading issue. Session ID: {self._session_id}" + ) + else: + # Command didn't complete normally + self._is_running_command = False + filtered_error = self._filter_error_output(error) + return ToolResult( + output=output, + error=filtered_error, + system=f"Stream reading error: {str(e)}. Command may have failed or produced output exceeding buffer limits." + ) + except asyncio.TimeoutError: output = self._partial_output error = self._partial_error @@ -392,17 +445,29 @@ async def run(self, command: str, timeout: float | None = None): system=f"Process timed out after {command_timeout} seconds. This process will continue to run in session {self._session_id}." ) except asyncio.LimitOverrunError: + # Enhanced limit overrun handling with timeout protection output_chunks = [] + total_timeout = command_timeout if 'command_timeout' in locals() else (timeout if timeout is not None else self._timeout) + start_time = asyncio.get_event_loop().time() + try: while True: + current_time = asyncio.get_event_loop().time() + remaining_time = total_timeout - (current_time - start_time) + + if remaining_time <= 0: + break + chunk = await asyncio.wait_for( self._process.stdout.read(8192), - timeout=0.1 + timeout=min(remaining_time, 30.0) # Max 30s per read ) if not chunk: break + output_chunks.append(chunk.decode(errors="replace")) accumulated = ''.join(output_chunks) + if self._sentinel in accumulated: output = accumulated.replace(self._sentinel, "") self._partial_output = output @@ -410,16 +475,18 @@ async def run(self, command: str, timeout: float | None = None): self._partial_error = error self._is_running_command = False break + except asyncio.TimeoutError: output = ''.join(output_chunks) self._partial_output = output error = self._process.stderr._buffer.decode(errors="replace") self._partial_error = error except Exception as e: + # Catch any other unexpected errors self._is_running_command = False return ToolResult( - error=f"Error executing command: {str(e)}", - system="An unexpected error occurred" + error=f"Unexpected error executing command: {str(e)}", + system="Session may need to be restarted" ) processed_output = output.rstrip('\n') @@ -469,34 +536,50 @@ async def stream_command(self, command: str): queue: asyncio.Queue[str] = asyncio.Queue() async def _reader(stream: asyncio.StreamReader, is_stdout: bool): - """Continuously read chunks (not lines) from *stream* and enqueue them.""" + """Continuously read chunks with timeout protection from *stream* and enqueue them.""" buffer = "" chunk_size = 256 # bytes + read_timeout = 30.0 # 30 second timeout for individual reads + last_data_time = asyncio.get_event_loop().time() while True: - data = await stream.read(chunk_size) - if not data: - break # EOF - - text = data.decode(errors="replace") - - if is_stdout: - buffer += text - - if sentinel in buffer: - before, _sent, _after = buffer.partition(sentinel) - if before: - await queue.put(before) - await queue.put(None) + try: + # Add timeout to individual read operations + data = await asyncio.wait_for(stream.read(chunk_size), timeout=read_timeout) + if not data: + break # EOF + + last_data_time = asyncio.get_event_loop().time() + text = data.decode(errors="replace") + + if is_stdout: + buffer += text + + if sentinel in buffer: + before, _sent, _after = buffer.partition(sentinel) + if before: + await queue.put(before) + await queue.put(None) + break + + if buffer: + await queue.put(buffer) + buffer = "" + else: + filtered = self._filter_error_output(text) + if filtered: + await queue.put(filtered) + + except asyncio.TimeoutError: + # Check if we've been waiting too long for data + current_time = asyncio.get_event_loop().time() + if current_time - last_data_time > read_timeout * 2: # No data for 2x timeout break - - if buffer: - await queue.put(buffer) - buffer = "" - else: - filtered = self._filter_error_output(text) - if filtered: - await queue.put(filtered) + continue + + except Exception: + # Handle any stream reading errors gracefully + break stdout_task = asyncio.create_task(_reader(self._process.stdout, True)) stderr_task = asyncio.create_task(_reader(self._process.stderr, False)) @@ -613,11 +696,15 @@ async def __call__( return ToolResult(system=busy_message) try: + # Validate command length to prevent issues with very long commands + validate_command_length(command) result = await current_session.run(command, timeout) if isinstance(result, ToolResult) and ( (result.system and "must be restarted" in result.system) or - (result.error and "0 bytes read on a total of undefined expected bytes" in result.error) + (result.error and "0 bytes read on a total of undefined expected bytes" in result.error) or + (result.error and "Stream reading error" in result.error) or + (result.system and "stream reading issue" in result.system) ): try: async with self._sessions_lock: