Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions run_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,51 @@
)


class _SafeWriter:
"""Transparent stdout wrapper that catches OSError from broken pipes.

When hermes-agent runs as a systemd service, Docker container, or headless
daemon, the stdout pipe can become unavailable (idle timeout, buffer
exhaustion, socket reset). Any print() call then raises
``OSError: [Errno 5] Input/output error``, which can crash
run_conversation() — especially via double-fault when the except handler
also tries to print.

This wrapper delegates all writes to the underlying stream and silently
catches OSError. It is installed once at the start of run_conversation()
and is transparent when stdout is healthy (zero overhead on the happy path).
"""

__slots__ = ("_inner",)

def __init__(self, inner):
object.__setattr__(self, "_inner", inner)

def write(self, data):
try:
return self._inner.write(data)
except OSError:
return len(data) if isinstance(data, str) else 0

def flush(self):
try:
self._inner.flush()
except OSError:
pass

def fileno(self):
return self._inner.fileno()

def isatty(self):
try:
return self._inner.isatty()
except OSError:
return False

def __getattr__(self, name):
return getattr(self._inner, name)


class IterationBudget:
"""Thread-safe shared iteration counter for parent and child agents.

Expand Down Expand Up @@ -3157,6 +3202,11 @@ def run_conversation(
Returns:
Dict: Complete conversation result with final response and message history
"""
# Guard stdout against OSError from broken pipes (systemd/headless/daemon).
# Installed once, transparent when stdout is healthy, prevents crash on write.
if not isinstance(sys.stdout, _SafeWriter):
sys.stdout = _SafeWriter(sys.stdout)

# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())

Expand Down
80 changes: 80 additions & 0 deletions tests/test_run_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,3 +1283,83 @@ def test_appends_to_non_json_tool_result(self, agent):
messages[-1]["content"] = last_content + f"\n\n{warning}"
assert "plain text result" in messages[-1]["content"]
assert "BUDGET WARNING" in messages[-1]["content"]


class TestSafeWriter:
"""Verify _SafeWriter guards stdout against OSError (broken pipes)."""

def test_write_delegates_normally(self):
"""When stdout is healthy, _SafeWriter is transparent."""
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
writer = _SafeWriter(inner)
writer.write("hello")
assert inner.getvalue() == "hello"

def test_write_catches_oserror(self):
"""OSError on write is silently caught, returns len(data)."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.write.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
result = writer.write("hello")
assert result == 5 # len("hello")

def test_flush_catches_oserror(self):
"""OSError on flush is silently caught."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.flush.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
writer.flush() # should not raise

def test_print_survives_broken_stdout(self, monkeypatch):
"""print() through _SafeWriter doesn't crash on broken pipe."""
import sys
from run_agent import _SafeWriter
from unittest.mock import MagicMock
broken = MagicMock()
broken.write.side_effect = OSError(5, "Input/output error")
original = sys.stdout
sys.stdout = _SafeWriter(broken)
try:
print("this should not crash") # would raise without _SafeWriter
finally:
sys.stdout = original

def test_installed_in_run_conversation(self, agent):
"""run_conversation installs _SafeWriter on sys.stdout."""
import sys
from run_agent import _SafeWriter
resp = _mock_response(content="Done", finish_reason="stop")
agent.client.chat.completions.create.return_value = resp
original = sys.stdout
try:
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
agent.run_conversation("test")
assert isinstance(sys.stdout, _SafeWriter)
finally:
sys.stdout = original

def test_double_wrap_prevented(self):
"""Wrapping an already-wrapped stream doesn't add layers."""
import sys
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
wrapped = _SafeWriter(inner)
# isinstance check should prevent double-wrapping
assert isinstance(wrapped, _SafeWriter)
# The guard in run_conversation checks isinstance before wrapping
if not isinstance(wrapped, _SafeWriter):
wrapped = _SafeWriter(wrapped)
# Still just one layer
wrapped.write("test")
assert inner.getvalue() == "test"
Loading