Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 124 additions & 37 deletions universal/bash_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ def __init__(self, message):
self.message = message


def validate_command_length(command: str) -> None:
"""Validate command length to prevent issues with very long commands."""
MAX_COMMAND_LENGTH = 100000 # 100KB limit
if len(command.encode('utf-8')) > MAX_COMMAND_LENGTH:
raise ToolError(f"Command too long ({len(command.encode('utf-8'))} bytes). Maximum allowed: {MAX_COMMAND_LENGTH} bytes")


class BaseAnthropicTool(metaclass=ABCMeta):
"""Abstract base class for Anthropic-defined tools."""

Expand Down Expand Up @@ -246,7 +253,7 @@ async def start(self):
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024 * 1024,
limit=10 * 1024 * 1024, # Increased from 1MB to 10MB to handle large outputs
)

self._started = True
Expand Down Expand Up @@ -371,15 +378,61 @@ async def run(self, command: str, timeout: float | None = None):

try:
command_timeout = timeout if timeout is not None else self._timeout
data = await asyncio.wait_for(
self._process.stdout.readuntil(self._sentinel.encode()),
timeout=command_timeout
)
output = data.decode(errors="replace").replace(self._sentinel, "")
self._partial_output = output
error = self._process.stderr._buffer.decode(errors="replace")
self._partial_error = error
self._is_running_command = False

# Add timeout protection for the readuntil operation
try:
data = await asyncio.wait_for(
self._process.stdout.readuntil(self._sentinel.encode()),
timeout=command_timeout
)
output = data.decode(errors="replace").replace(self._sentinel, "")
self._partial_output = output
error = self._process.stderr._buffer.decode(errors="replace")
self._partial_error = error
self._is_running_command = False

except (asyncio.IncompleteReadError, ConnectionResetError) as e:
# Handle the "0 bytes read" error and similar stream issues
# This is the main fix for the reported error
output = self._partial_output
error = self._partial_error

# Try to get any remaining data from buffers
if hasattr(self._process.stdout, '_buffer') and self._process.stdout._buffer:
try:
remaining = self._process.stdout._buffer.decode(errors="replace")
output += remaining
except Exception:
pass

if hasattr(self._process.stderr, '_buffer') and self._process.stderr._buffer:
try:
remaining = self._process.stderr._buffer.decode(errors="replace")
error += remaining
except Exception:
pass

# Check if the command actually completed successfully despite the stream error
if self._sentinel in output:
# Command completed, just had a stream reading issue
output = output.replace(self._sentinel, "")
self._is_running_command = False
filtered_error = self._filter_error_output(error)
return CLIResult(
output=output.rstrip('\n'),
error=filtered_error,
system=f"Command completed despite stream reading issue. Session ID: {self._session_id}"
)
else:
# Command didn't complete normally
self._is_running_command = False
filtered_error = self._filter_error_output(error)
return ToolResult(
output=output,
error=filtered_error,
system=f"Stream reading error: {str(e)}. Command may have failed or produced output exceeding buffer limits."
)

except asyncio.TimeoutError:
output = self._partial_output
error = self._partial_error
Expand All @@ -392,34 +445,48 @@ async def run(self, command: str, timeout: float | None = None):
system=f"Process timed out after {command_timeout} seconds. This process will continue to run in session {self._session_id}."
)
except asyncio.LimitOverrunError:
# Enhanced limit overrun handling with timeout protection
output_chunks = []
total_timeout = command_timeout if 'command_timeout' in locals() else (timeout if timeout is not None else self._timeout)
start_time = asyncio.get_event_loop().time()

try:
while True:
current_time = asyncio.get_event_loop().time()
remaining_time = total_timeout - (current_time - start_time)

if remaining_time <= 0:
break

chunk = await asyncio.wait_for(
self._process.stdout.read(8192),
timeout=0.1
timeout=min(remaining_time, 30.0) # Max 30s per read
)
if not chunk:
break

output_chunks.append(chunk.decode(errors="replace"))
accumulated = ''.join(output_chunks)

if self._sentinel in accumulated:
output = accumulated.replace(self._sentinel, "")
self._partial_output = output
error = self._process.stderr._buffer.decode(errors="replace")
self._partial_error = error
self._is_running_command = False
break

except asyncio.TimeoutError:
output = ''.join(output_chunks)
self._partial_output = output
error = self._process.stderr._buffer.decode(errors="replace")
self._partial_error = error
except Exception as e:
# Catch any other unexpected errors
self._is_running_command = False
return ToolResult(
error=f"Error executing command: {str(e)}",
system="An unexpected error occurred"
error=f"Unexpected error executing command: {str(e)}",
system="Session may need to be restarted"
)

processed_output = output.rstrip('\n')
Expand Down Expand Up @@ -469,34 +536,50 @@ async def stream_command(self, command: str):
queue: asyncio.Queue[str] = asyncio.Queue()

async def _reader(stream: asyncio.StreamReader, is_stdout: bool):
"""Continuously read chunks (not lines) from *stream* and enqueue them."""
"""Continuously read chunks with timeout protection from *stream* and enqueue them."""
buffer = ""
chunk_size = 256 # bytes
read_timeout = 30.0 # 30 second timeout for individual reads
last_data_time = asyncio.get_event_loop().time()

while True:
data = await stream.read(chunk_size)
if not data:
break # EOF

text = data.decode(errors="replace")

if is_stdout:
buffer += text

if sentinel in buffer:
before, _sent, _after = buffer.partition(sentinel)
if before:
await queue.put(before)
await queue.put(None)
try:
# Add timeout to individual read operations
data = await asyncio.wait_for(stream.read(chunk_size), timeout=read_timeout)
if not data:
break # EOF

last_data_time = asyncio.get_event_loop().time()
text = data.decode(errors="replace")

if is_stdout:
buffer += text

if sentinel in buffer:
before, _sent, _after = buffer.partition(sentinel)
if before:
await queue.put(before)
await queue.put(None)
break

if buffer:
await queue.put(buffer)
buffer = ""
else:
filtered = self._filter_error_output(text)
if filtered:
await queue.put(filtered)

except asyncio.TimeoutError:
# Check if we've been waiting too long for data
current_time = asyncio.get_event_loop().time()
if current_time - last_data_time > read_timeout * 2: # No data for 2x timeout
break

if buffer:
await queue.put(buffer)
buffer = ""
else:
filtered = self._filter_error_output(text)
if filtered:
await queue.put(filtered)
continue

except Exception:
# Handle any stream reading errors gracefully
break

stdout_task = asyncio.create_task(_reader(self._process.stdout, True))
stderr_task = asyncio.create_task(_reader(self._process.stderr, False))
Expand Down Expand Up @@ -613,11 +696,15 @@ async def __call__(
return ToolResult(system=busy_message)

try:
# Validate command length to prevent issues with very long commands
validate_command_length(command)
result = await current_session.run(command, timeout)

if isinstance(result, ToolResult) and (
(result.system and "must be restarted" in result.system) or
(result.error and "0 bytes read on a total of undefined expected bytes" in result.error)
(result.error and "0 bytes read on a total of undefined expected bytes" in result.error) or
(result.error and "Stream reading error" in result.error) or
(result.system and "stream reading issue" in result.system)
):
try:
async with self._sessions_lock:
Expand Down