From 1628cb2bcaae9fb511680da30494191a1979ed58 Mon Sep 17 00:00:00 2001 From: Joe Licata Date: Thu, 5 Mar 2026 22:28:22 +0000 Subject: [PATCH] fix: Resolve socket hang up errors for large file execution Node.js 20 sets a default 5-second socket timeout on HTTP connections. When code execution takes longer (cold sandbox starts, large file mounting, heavy pandas operations), the client destroys the socket before the server responds, causing "socket hang up" errors. Three fixes applied: 1. Streaming keepalive on /exec endpoint: sends whitespace every 3s to keep the TCP connection alive during long operations. JSON parsers ignore leading whitespace so this is fully transparent. 2. Non-blocking file I/O: moved MinIO response.read() into the thread pool executor (was blocking the asyncio event loop), and added stream_file_to_path() using fget_object for direct disk-to-disk transfer without loading files into memory. 3. Increased default sandbox pool size (SANDBOX_POOL_PY=5) to reduce cold-start frequency under concurrent load. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/api/exec.py | 105 +++++++++++---- src/services/execution/runner.py | 61 +++++---- src/services/file.py | 58 +++++++- src/services/interfaces.py | 7 + tests/functional/test_concurrent_file_exec.py | 127 ++++++++++++++++++ 5 files changed, 304 insertions(+), 54 deletions(-) create mode 100644 tests/functional/test_concurrent_file_exec.py diff --git a/src/api/exec.py b/src/api/exec.py index 62417af..c3c09fb 100644 --- a/src/api/exec.py +++ b/src/api/exec.py @@ -2,12 +2,20 @@ This is a thin endpoint that delegates to ExecutionOrchestrator for the actual execution workflow logic. + +Uses a streaming response with keepalive whitespace to prevent client +socket timeouts (Node.js 20 defaults to 5s) during long-running +executions like large file operations or cold sandbox starts. """ +import asyncio + import structlog from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse from ..models import ExecRequest, ExecResponse +from ..models.errors import ErrorResponse, ValidationError, ServiceUnavailableError from ..services.orchestrator import ExecutionOrchestrator from ..dependencies.services import ( SessionServiceDep, @@ -21,8 +29,12 @@ logger = structlog.get_logger(__name__) router = APIRouter() +# Keepalive interval: send a space every 3 seconds to prevent +# Node.js 20's default 5-second socket timeout from firing. +_KEEPALIVE_INTERVAL = 3 -@router.post("/exec", response_model=ExecResponse) + +@router.post("/exec", responses={200: {"model": ExecResponse}}) async def execute_code( request: ExecRequest, http_request: Request, @@ -42,17 +54,9 @@ async def execute_code( State is stored in Redis (2 hour TTL) with automatic archival to MinIO for long-term storage (7 day TTL). - Args: - request: Execution request with code, language, and optional files - http_request: HTTP request for accessing state (api_key_hash) - session_service: Session management service - file_service: File storage service - execution_service: Code execution service - state_service: Python state persistence service (Redis) - state_archival_service: Python state archival service (MinIO) - - Returns: - ExecResponse with session_id, stdout, stderr, and generated files + Returns a streaming response that sends keepalive whitespace before the + JSON body to prevent client socket timeouts during long operations. + JSON parsers ignore leading whitespace, so this is fully compatible. """ request_id = generate_request_id()[:8] @@ -79,15 +83,70 @@ async def execute_code( state_archival_service=state_archival_service, ) - # Execute via orchestrator (handles validation, session, files, execution, cleanup) - response = await orchestrator.execute( - request, request_id, api_key_hash=api_key_hash, is_env_key=is_env_key - ) - - logger.info( - "Code execution completed", - request_id=request_id, - session_id=response.session_id, + async def _stream_response(): + """Execute code and stream the response with keepalive whitespace. + + Sends a space character every few seconds while the execution is + running. Once the result is ready, sends the JSON body. Leading + whitespace is ignored by JSON parsers, so this is transparent + to clients. + """ + result_holder = {} + error_holder = {} + + async def _run(): + try: + result_holder["response"] = await orchestrator.execute( + request, + request_id, + api_key_hash=api_key_hash, + is_env_key=is_env_key, + ) + except Exception as e: + error_holder["error"] = e + + task = asyncio.create_task(_run()) + + # Send keepalive spaces while execution is running + while not task.done(): + try: + await asyncio.wait_for( + asyncio.shield(task), timeout=_KEEPALIVE_INTERVAL + ) + except asyncio.TimeoutError: + # Execution still running — send keepalive space + yield b" " + except Exception: + # Task raised an exception — will be handled below + break + + # Ensure the task is complete + if not task.done(): + await task + + # Re-raise validation/service errors so FastAPI exception handlers + # can return proper HTTP status codes (400, 503, etc.) + if "error" in error_holder: + err = error_holder["error"] + if isinstance(err, (ValidationError, ServiceUnavailableError)): + raise err + error_resp = ErrorResponse( + error=str(err), + error_type="execution", + ) + yield error_resp.model_dump_json().encode() + return + + # Send the JSON response + response = result_holder["response"] + logger.info( + "Code execution completed", + request_id=request_id, + session_id=response.session_id, + ) + yield response.model_dump_json().encode() + + return StreamingResponse( + _stream_response(), + media_type="application/json", ) - - return response diff --git a/src/services/execution/runner.py b/src/services/execution/runner.py index ef68db6..029ce28 100644 --- a/src/services/execution/runner.py +++ b/src/services/execution/runner.py @@ -701,11 +701,23 @@ async def _mount_files_to_sandbox( files: List[Dict[str, Any]], language: str = "py", ) -> None: - """Mount files to sandbox workspace.""" + """Mount files to sandbox workspace. + + Uses streaming (MinIO fget_object) to transfer files directly to the + sandbox data directory without loading entire files into memory. This + avoids blocking the asyncio event loop during large file transfers. + """ try: from ..file import FileService + from ...config.languages import get_user_id_for_language file_service = FileService() + user_id = get_user_id_for_language(language) + + def _set_file_perms(path, uid): + os.chown(path, uid, uid) + os.chmod(path, 0o644) + return os.path.getsize(path) for file_info in files: filename = file_info.get("filename", "unknown") @@ -717,36 +729,37 @@ async def _mount_files_to_sandbox( continue try: - file_content = await file_service.get_file_content( - session_id, file_id + normalized_filename = OutputProcessor.sanitize_filename(filename) + dest_path = str(sandbox_info.data_dir / normalized_filename) + + file_size = file_info.get("size", 0) + if file_size > 10 * 1024 * 1024: + logger.info( + "Mounting large file", + filename=filename, + size_mb=round(file_size / 1024 / 1024, 1), + ) + + # Stream directly from MinIO to sandbox directory (non-blocking) + success = await file_service.stream_file_to_path( + session_id, file_id, dest_path ) - if file_content is not None: - # Direct memory-to-sandbox transfer (no tempfiles) - normalized_filename = OutputProcessor.sanitize_filename( - filename + if success: + actual_size = await asyncio.to_thread( + _set_file_perms, dest_path, user_id ) - dest_path = f"/mnt/data/{normalized_filename}" - - if self.sandbox_manager.copy_content_to_sandbox( - sandbox_info, file_content, dest_path, language=language - ): - logger.debug( - "Mounted file", - filename=filename, - size=len(file_content), - ) - else: - logger.warning("Failed to mount file", filename=filename) - await self._create_placeholder_file(sandbox_info, filename) - else: - logger.warning( - f"Could not retrieve content for file {filename}" + logger.debug( + "Mounted file", + filename=filename, + size=actual_size, ) + else: + logger.warning("Failed to mount file", filename=filename) await self._create_placeholder_file(sandbox_info, filename) except Exception as file_error: - logger.error(f"Error retrieving file {filename}: {file_error}") + logger.error(f"Error mounting file {filename}: {file_error}") await self._create_placeholder_file(sandbox_info, filename) except Exception as e: diff --git a/src/services/file.py b/src/services/file.py index 84e5626..cebd534 100644 --- a/src/services/file.py +++ b/src/services/file.py @@ -546,16 +546,17 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte object_key = metadata["object_key"] try: - # Get object content loop = asyncio.get_event_loop() - response = await loop.run_in_executor( - None, self.minio_client.get_object, self.bucket_name, object_key - ) - content = response.read() - response.close() - response.release_conn() + def _download(): + response = self.minio_client.get_object(self.bucket_name, object_key) + try: + return response.read() + finally: + response.close() + response.release_conn() + content = await loop.run_in_executor(None, _download) return content except S3Error as e: @@ -567,6 +568,49 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte ) return None + async def stream_file_to_path( + self, session_id: str, file_id: str, dest_path: str + ) -> bool: + """Stream file content from MinIO directly to a local file path. + + Uses MinIO's fget_object for efficient disk-to-disk transfer + without loading the entire file into memory. Runs in a thread + pool executor to avoid blocking the async event loop. + + Args: + session_id: Session identifier + file_id: File identifier + dest_path: Local filesystem path to write the file to + + Returns: + True if successful, False otherwise + """ + metadata = await self.get_file_metadata(session_id, file_id) + if not metadata: + return False + + object_key = metadata["object_key"] + + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + self.minio_client.fget_object, + self.bucket_name, + object_key, + dest_path, + ) + return True + except S3Error as e: + logger.error( + "Failed to stream file to path", + error=str(e), + session_id=session_id, + file_id=file_id, + dest_path=dest_path, + ) + return False + async def store_uploaded_file( self, session_id: str, diff --git a/src/services/interfaces.py b/src/services/interfaces.py index d464b4e..5225c4b 100644 --- a/src/services/interfaces.py +++ b/src/services/interfaces.py @@ -115,6 +115,13 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte """Get actual file content.""" pass + @abstractmethod + async def stream_file_to_path( + self, session_id: str, file_id: str, dest_path: str + ) -> bool: + """Stream file content directly to a local file path.""" + pass + @abstractmethod async def delete_file(self, session_id: str, file_id: str) -> bool: """Delete a file from storage.""" diff --git a/tests/functional/test_concurrent_file_exec.py b/tests/functional/test_concurrent_file_exec.py new file mode 100644 index 0000000..0d28da0 --- /dev/null +++ b/tests/functional/test_concurrent_file_exec.py @@ -0,0 +1,127 @@ +"""Functional tests for concurrent execution with large file uploads. + +Regression test for event loop blocking bug: when large files (>40MB) are +downloaded from MinIO during file mounting, response.read() blocks the +asyncio event loop, starving all concurrent HTTP connections. + +This manifests as "socket hang up" errors in clients like LibreChat. +""" + +import asyncio +import time + +import httpx +import pytest + + +# 50MB of CSV data — large enough to trigger measurable event loop blocking +LARGE_FILE_SIZE_MB = 50 +LARGE_CSV_ROW = b"col1,col2,col3,col4,col5,col6,col7,col8\n" +LARGE_CSV_DATA = LARGE_CSV_ROW * (LARGE_FILE_SIZE_MB * 1024 * 1024 // len(LARGE_CSV_ROW)) + +# Threshold: concurrent pings must complete within this time (seconds). +# Without the fix, pings take 8-11s due to event loop blocking. +# Allow up to 8s to account for pool cold starts after container restart. +PING_MAX_LATENCY = 8.0 + + +class TestConcurrentFileExecution: + """Test that large file operations don't block the event loop.""" + + @pytest.mark.asyncio + async def test_large_file_exec_does_not_block_concurrent_requests( + self, async_client, auth_headers, unique_entity_id + ): + """Concurrent simple exec requests must not be blocked by large file mounting. + + Steps: + 1. Upload a 50MB CSV file + 2. Fire 5 exec requests referencing the file + 3 simple print() pings + 3. Assert all pings complete within PING_MAX_LATENCY seconds + 4. Assert all file execs succeed + """ + # --- Upload large file --- + upload_headers = {"x-api-key": auth_headers["x-api-key"]} + files = {"files": ("large_test.csv", LARGE_CSV_DATA, "text/csv")} + data = {"entity_id": unique_entity_id} + + upload_resp = await async_client.post( + "/upload", headers=upload_headers, files=files, data=data, + timeout=120.0, + ) + assert upload_resp.status_code == 200, f"Upload failed: {upload_resp.text}" + + result = upload_resp.json() + session_id = result["session_id"] + file_id = result["files"][0]["fileId"] + filename = result["files"][0]["filename"] + + # --- Define concurrent tasks --- + async def exec_with_file(idx: int) -> tuple: + """Execute code that references the large file.""" + start = time.perf_counter() + resp = await async_client.post( + "/exec", + headers=auth_headers, + json={ + "code": f"import os; print('exec-{idx}', os.path.getsize('/mnt/data/{filename}'))", + "lang": "py", + "session_id": session_id, + "files": [ + {"id": file_id, "session_id": session_id, "name": filename} + ], + }, + timeout=60.0, + ) + elapsed = time.perf_counter() - start + return ("file_exec", idx, resp.status_code, elapsed) + + async def simple_ping(idx: int) -> tuple: + """Simple exec with no file — should not be blocked.""" + start = time.perf_counter() + resp = await async_client.post( + "/exec", + headers=auth_headers, + json={ + "code": f"print('ping-{idx}')", + "lang": "py", + }, + timeout=60.0, + ) + elapsed = time.perf_counter() - start + return ("ping", idx, resp.status_code, elapsed) + + # --- Fire all concurrently --- + tasks = [] + for i in range(5): + tasks.append(exec_with_file(i)) + for i in range(3): + tasks.append(simple_ping(i)) + + results = await asyncio.gather(*tasks) + + # --- Assertions --- + file_results = [r for r in results if r[0] == "file_exec"] + ping_results = [r for r in results if r[0] == "ping"] + + # All requests must succeed + for kind, idx, status, elapsed in results: + assert status == 200, f"{kind}-{idx} failed with status {status}" + + # Pings must not be blocked by file operations + for kind, idx, status, elapsed in ping_results: + assert elapsed < PING_MAX_LATENCY, ( + f"ping-{idx} took {elapsed:.2f}s (limit: {PING_MAX_LATENCY}s) — " + f"event loop likely blocked by large file I/O" + ) + + max_ping = max(r[3] for r in ping_results) + avg_ping = sum(r[3] for r in ping_results) / len(ping_results) + max_file = max(r[3] for r in file_results) + + # Log timing summary (visible with -s flag) + print(f"\n File exec max: {max_file:.2f}s") + print(f" Ping max: {max_ping:.2f}s, avg: {avg_ping:.2f}s") + for kind, idx, status, elapsed in sorted(results, key=lambda r: r[3]): + blocked = " *** BLOCKED" if elapsed > PING_MAX_LATENCY else "" + print(f" {kind}-{idx}: {elapsed:.2f}s{blocked}")