Skip to content

Commit 9dd6fe5

Browse files
ashwin-antclaude
andcommitted
Fix Windows subprocess stdin buffering issue causing ClaudeSDKClient hangs (#208)
## Problem ClaudeSDKClient initialization would hang indefinitely on Windows, timing out after 60 seconds. The SDK successfully spawned the Claude CLI subprocess but control requests sent via stdin never reached the subprocess due to Windows subprocess stdin buffering behavior with Python's asyncio. ## Root Cause On Windows, when using asyncio subprocess streams, data written to stdin can remain buffered and not immediately sent to the child process. The CLI subprocess waits for the initialization request that's stuck in Python's buffer, causing the 60-second timeout. ## Solution 1. Added `flush_stdin()` method to Transport base class (non-abstract, default no-op) 2. Implemented Windows-specific flush in SubprocessCLITransport that calls `drain()` on the asyncio StreamWriter when available 3. Call `flush_stdin()` after all control protocol writes in Query class: - After sending control requests (_send_control_request) - After responding to incoming requests (_handle_control_request) ## Tests Added - test_flush_stdin_on_windows: Verifies drain() called on Windows - test_flush_stdin_on_non_windows: Verifies no-op on other platforms - test_flush_stdin_without_process: Tests graceful handling of missing process - test_flush_stdin_fallback_to_inner_stream: Tests wrapped stream fallback - test_flush_stdin_called_after_control_requests: Integration test for outgoing requests - test_flush_stdin_called_after_control_responses: Integration test for incoming requests All tests pass on macOS, and the fix is platform-specific to Windows only. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6793e40 commit 9dd6fe5

File tree

5 files changed

+291
-0
lines changed

5 files changed

+291
-0
lines changed

src/claude_agent_sdk/_internal/query.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None:
274274
},
275275
}
276276
await self.transport.write(json.dumps(success_response) + "\n")
277+
await self.transport.flush_stdin()
277278

278279
except Exception as e:
279280
# Send error response
@@ -286,6 +287,7 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None:
286287
},
287288
}
288289
await self.transport.write(json.dumps(error_response) + "\n")
290+
await self.transport.flush_stdin()
289291

290292
async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]:
291293
"""Send control request to CLI and wait for response."""
@@ -309,6 +311,11 @@ async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]
309311

310312
await self.transport.write(json.dumps(control_request) + "\n")
311313

314+
# Flush stdin to ensure the request is sent immediately
315+
# This is critical on Windows where buffering can prevent the subprocess
316+
# from receiving the data
317+
await self.transport.flush_stdin()
318+
312319
# Wait for response
313320
try:
314321
with anyio.fail_after(60.0):

src/claude_agent_sdk/_internal/transport/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,17 @@ async def end_input(self) -> None:
6464
"""End the input stream (close stdin for process transports)."""
6565
pass
6666

67+
async def flush_stdin(self) -> None:
68+
"""Flush the stdin stream to ensure data is sent immediately.
69+
70+
This is primarily needed on Windows where subprocess stdin buffering
71+
can prevent data from being sent to the child process immediately.
72+
73+
Default implementation does nothing. Transports that support stdin
74+
flushing should override this method.
75+
"""
76+
# Default implementation - subclasses can override for platform-specific flushing
77+
return None
78+
6779

6880
__all__ = ["Transport"]

src/claude_agent_sdk/_internal/transport/subprocess_cli.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import os
6+
import platform
67
import re
78
import shutil
89
import sys
@@ -496,3 +497,46 @@ async def _check_claude_version(self) -> None:
496497
def is_ready(self) -> bool:
497498
"""Check if transport is ready for communication."""
498499
return self._ready
500+
501+
async def flush_stdin(self) -> None:
502+
"""Flush stdin to ensure data is sent immediately to the subprocess.
503+
504+
This is particularly important on Windows where subprocess stdin buffering
505+
can prevent data from reaching the child process immediately.
506+
507+
This method attempts to drain the stdin stream if using asyncio backend,
508+
which is the primary fix for Windows subprocess communication issues.
509+
"""
510+
# Only flush if we have a process and stdin stream
511+
if not self._process or not self._process.stdin:
512+
return
513+
514+
# On Windows, we need to explicitly flush/drain the stdin stream
515+
# to ensure data reaches the subprocess immediately
516+
if platform.system() == "Windows":
517+
try:
518+
# anyio wraps subprocess stdin in a ByteSendStream
519+
# When using asyncio backend, the underlying stream is a StreamWriter
520+
# which has a drain() method that we need to call
521+
stdin_stream = self._process.stdin
522+
523+
# Check if this is an asyncio StreamWriter (has drain method)
524+
if hasattr(stdin_stream, "drain") and callable(stdin_stream.drain):
525+
await stdin_stream.drain()
526+
logger.debug("Flushed stdin stream on Windows")
527+
else:
528+
# If not a StreamWriter, try to access wrapped/inner stream
529+
# anyio may wrap the stream in various ways depending on backend
530+
for attr in ["_stream", "_transport_stream", "transport_stream"]:
531+
if hasattr(stdin_stream, attr):
532+
inner = getattr(stdin_stream, attr)
533+
if hasattr(inner, "drain") and callable(inner.drain):
534+
await inner.drain()
535+
logger.debug(
536+
f"Flushed stdin inner stream via {attr} on Windows"
537+
)
538+
break
539+
except Exception as e:
540+
# Log but don't fail - flushing is a best-effort optimization
541+
logger.debug(f"Could not flush stdin on Windows: {e}")
542+
pass

tests/test_streaming_client.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
ClaudeAgentOptions,
1616
ClaudeSDKClient,
1717
CLIConnectionError,
18+
PermissionResultAllow,
1819
ResultMessage,
1920
TextBlock,
2021
UserMessage,
2122
query,
2223
)
24+
from claude_agent_sdk._internal.query import Query
25+
from claude_agent_sdk._internal.transport import Transport
2326
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
2427

2528

@@ -34,6 +37,7 @@ def create_mock_transport(with_init_response=True):
3437
mock_transport.close = AsyncMock()
3538
mock_transport.end_input = AsyncMock()
3639
mock_transport.write = AsyncMock()
40+
mock_transport.flush_stdin = AsyncMock()
3741
mock_transport.is_ready = Mock(return_value=True)
3842

3943
# Track written messages to simulate control protocol responses
@@ -571,6 +575,82 @@ async def get_next_message():
571575

572576
anyio.run(_test)
573577

578+
def test_flush_stdin_called_after_control_responses(self):
579+
"""Test that flush_stdin is called after responding to control requests (issue #208)."""
580+
581+
async def _test():
582+
# Create a mock transport
583+
mock_transport = AsyncMock(spec=Transport)
584+
mock_transport.is_ready = Mock(return_value=True)
585+
586+
# Track write and flush calls
587+
write_calls = []
588+
flush_calls = []
589+
590+
async def mock_write(data):
591+
write_calls.append(data)
592+
593+
async def mock_flush():
594+
flush_calls.append(True)
595+
596+
mock_transport.write = AsyncMock(side_effect=mock_write)
597+
mock_transport.flush_stdin = AsyncMock(side_effect=mock_flush)
598+
599+
# Create mock read_messages that doesn't yield anything
600+
async def mock_read_messages():
601+
# Just wait forever (test will complete before this matters)
602+
await asyncio.sleep(1000)
603+
yield {}
604+
605+
mock_transport.read_messages = mock_read_messages
606+
607+
# Create Query with streaming mode
608+
query = Query(transport=mock_transport, is_streaming_mode=True)
609+
await query.start()
610+
611+
# Simulate an incoming tool permission request
612+
permission_request = {
613+
"type": "control_request",
614+
"request_id": "test_req_123",
615+
"request": {
616+
"subtype": "can_use_tool",
617+
"tool_name": "Read",
618+
"input": {"file_path": "/test.txt"},
619+
"permission_suggestions": [],
620+
},
621+
}
622+
623+
# Set up a permission callback that allows the tool
624+
async def mock_can_use_tool(tool_name, input_data, context):
625+
return PermissionResultAllow()
626+
627+
query.can_use_tool = mock_can_use_tool
628+
629+
# Clear previous calls
630+
write_calls.clear()
631+
flush_calls.clear()
632+
633+
# Handle the control request
634+
await query._handle_control_request(permission_request)
635+
636+
# Give it a moment to complete
637+
await asyncio.sleep(0.01)
638+
639+
# Verify that flush_stdin was called after writing the response
640+
assert len(write_calls) == 1, "Should have written one control response"
641+
assert len(flush_calls) == 1, (
642+
"flush_stdin should be called after writing response"
643+
)
644+
645+
# Verify the response was a success
646+
response_data = json.loads(write_calls[0])
647+
assert response_data["type"] == "control_response"
648+
assert response_data["response"]["subtype"] == "success"
649+
650+
await query.close()
651+
652+
anyio.run(_test)
653+
574654

575655
class TestQueryWithAsyncIterable:
576656
"""Test query() function with async iterable inputs."""
@@ -833,3 +913,44 @@ async def mock_receive():
833913
assert isinstance(messages[-1], ResultMessage)
834914

835915
anyio.run(_test)
916+
917+
def test_flush_stdin_called_after_control_requests(self):
918+
"""Test that flush_stdin is called after sending control requests (issue #208)."""
919+
920+
async def _test():
921+
with patch(
922+
"claude_agent_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
923+
) as mock_transport_class:
924+
mock_transport = create_mock_transport()
925+
926+
# Add flush_stdin mock and tracking
927+
flush_calls = []
928+
929+
async def mock_flush():
930+
flush_calls.append(True)
931+
932+
mock_transport.flush_stdin = AsyncMock(side_effect=mock_flush)
933+
934+
mock_transport_class.return_value = mock_transport
935+
936+
async with ClaudeSDKClient() as client:
937+
# Initialization should call flush_stdin
938+
# Wait a bit for initialization to complete
939+
await asyncio.sleep(0.05)
940+
941+
# Verify flush_stdin was called at least once (for initialization)
942+
assert len(flush_calls) >= 1, (
943+
"flush_stdin should be called during initialization"
944+
)
945+
initial_flush_count = len(flush_calls)
946+
947+
# Send interrupt control request
948+
await client.interrupt()
949+
await asyncio.sleep(0.05)
950+
951+
# Verify flush_stdin was called again (for interrupt request)
952+
assert len(flush_calls) > initial_flush_count, (
953+
"flush_stdin should be called after interrupt"
954+
)
955+
956+
anyio.run(_test)

tests/test_transport.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,3 +486,110 @@ async def _test():
486486
assert user_passed == "claude"
487487

488488
anyio.run(_test)
489+
490+
def test_flush_stdin_on_windows(self):
491+
"""Test that flush_stdin calls drain() on Windows (issue #208)."""
492+
493+
async def _test():
494+
# Mock platform.system to return Windows
495+
with patch("platform.system", return_value="Windows"):
496+
transport = SubprocessCLITransport(
497+
prompt="test",
498+
options=ClaudeAgentOptions(),
499+
cli_path="/usr/bin/claude",
500+
)
501+
502+
# Create a mock process with stdin that has drain method
503+
mock_process = MagicMock()
504+
mock_stdin = AsyncMock()
505+
mock_stdin.drain = AsyncMock()
506+
mock_process.stdin = mock_stdin
507+
transport._process = mock_process
508+
509+
# Call flush_stdin
510+
await transport.flush_stdin()
511+
512+
# Verify drain was called on Windows
513+
mock_stdin.drain.assert_called_once()
514+
515+
anyio.run(_test)
516+
517+
def test_flush_stdin_on_non_windows(self):
518+
"""Test that flush_stdin does nothing on non-Windows platforms."""
519+
520+
async def _test():
521+
# Mock platform.system to return Linux
522+
with patch("platform.system", return_value="Linux"):
523+
transport = SubprocessCLITransport(
524+
prompt="test",
525+
options=ClaudeAgentOptions(),
526+
cli_path="/usr/bin/claude",
527+
)
528+
529+
# Create a mock process with stdin
530+
mock_process = MagicMock()
531+
mock_stdin = AsyncMock()
532+
mock_stdin.drain = AsyncMock()
533+
mock_process.stdin = mock_stdin
534+
transport._process = mock_process
535+
536+
# Call flush_stdin
537+
await transport.flush_stdin()
538+
539+
# Verify drain was NOT called on non-Windows
540+
mock_stdin.drain.assert_not_called()
541+
542+
anyio.run(_test)
543+
544+
def test_flush_stdin_without_process(self):
545+
"""Test that flush_stdin handles missing process gracefully."""
546+
547+
async def _test():
548+
transport = SubprocessCLITransport(
549+
prompt="test",
550+
options=ClaudeAgentOptions(),
551+
cli_path="/usr/bin/claude",
552+
)
553+
554+
# Don't set up a process
555+
transport._process = None
556+
557+
# Should not raise an error
558+
await transport.flush_stdin()
559+
560+
anyio.run(_test)
561+
562+
def test_flush_stdin_fallback_to_inner_stream(self):
563+
"""Test that flush_stdin tries to find drain() in wrapped streams."""
564+
565+
async def _test():
566+
# Mock platform.system to return Windows
567+
with patch("platform.system", return_value="Windows"):
568+
transport = SubprocessCLITransport(
569+
prompt="test",
570+
options=ClaudeAgentOptions(),
571+
cli_path="/usr/bin/claude",
572+
)
573+
574+
# Create a mock process with stdin that doesn't have drain,
575+
# but has an inner _stream that does
576+
mock_process = MagicMock()
577+
mock_stdin = MagicMock()
578+
# Remove drain from stdin itself
579+
del mock_stdin.drain
580+
581+
# Add inner stream with drain
582+
mock_inner_stream = AsyncMock()
583+
mock_inner_stream.drain = AsyncMock()
584+
mock_stdin._stream = mock_inner_stream
585+
586+
mock_process.stdin = mock_stdin
587+
transport._process = mock_process
588+
589+
# Call flush_stdin
590+
await transport.flush_stdin()
591+
592+
# Verify drain was called on the inner stream
593+
mock_inner_stream.drain.assert_called_once()
594+
595+
anyio.run(_test)

0 commit comments

Comments
 (0)