Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 82 additions & 23 deletions src/api/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@

This is a thin endpoint that delegates to ExecutionOrchestrator for
the actual execution workflow logic.

Uses a streaming response with keepalive whitespace to prevent client
socket timeouts (Node.js 20 defaults to 5s) during long-running
executions like large file operations or cold sandbox starts.
"""

import asyncio

import structlog
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse

from ..models import ExecRequest, ExecResponse
from ..models.errors import ErrorResponse, ValidationError, ServiceUnavailableError
from ..services.orchestrator import ExecutionOrchestrator
from ..dependencies.services import (
SessionServiceDep,
Expand All @@ -21,8 +29,12 @@
logger = structlog.get_logger(__name__)
router = APIRouter()

# Keepalive interval: send a space every 3 seconds to prevent
# Node.js 20's default 5-second socket timeout from firing.
_KEEPALIVE_INTERVAL = 3

@router.post("/exec", response_model=ExecResponse)

@router.post("/exec", responses={200: {"model": ExecResponse}})
async def execute_code(
request: ExecRequest,
http_request: Request,
Expand All @@ -42,17 +54,9 @@ async def execute_code(
State is stored in Redis (2 hour TTL) with automatic archival to MinIO for
long-term storage (7 day TTL).

Args:
request: Execution request with code, language, and optional files
http_request: HTTP request for accessing state (api_key_hash)
session_service: Session management service
file_service: File storage service
execution_service: Code execution service
state_service: Python state persistence service (Redis)
state_archival_service: Python state archival service (MinIO)

Returns:
ExecResponse with session_id, stdout, stderr, and generated files
Returns a streaming response that sends keepalive whitespace before the
JSON body to prevent client socket timeouts during long operations.
JSON parsers ignore leading whitespace, so this is fully compatible.
"""
request_id = generate_request_id()[:8]

Expand All @@ -79,15 +83,70 @@ async def execute_code(
state_archival_service=state_archival_service,
)

# Execute via orchestrator (handles validation, session, files, execution, cleanup)
response = await orchestrator.execute(
request, request_id, api_key_hash=api_key_hash, is_env_key=is_env_key
)

logger.info(
"Code execution completed",
request_id=request_id,
session_id=response.session_id,
async def _stream_response():
"""Execute code and stream the response with keepalive whitespace.

Sends a space character every few seconds while the execution is
running. Once the result is ready, sends the JSON body. Leading
whitespace is ignored by JSON parsers, so this is transparent
to clients.
"""
result_holder = {}
error_holder = {}

async def _run():
try:
result_holder["response"] = await orchestrator.execute(
request,
request_id,
api_key_hash=api_key_hash,
is_env_key=is_env_key,
)
except Exception as e:
error_holder["error"] = e

task = asyncio.create_task(_run())

# Send keepalive spaces while execution is running
while not task.done():
try:
await asyncio.wait_for(
asyncio.shield(task), timeout=_KEEPALIVE_INTERVAL
)
except asyncio.TimeoutError:
# Execution still running — send keepalive space
yield b" "
except Exception:
# Task raised an exception — will be handled below
break

# Ensure the task is complete
if not task.done():
await task

# Re-raise validation/service errors so FastAPI exception handlers
# can return proper HTTP status codes (400, 503, etc.)
if "error" in error_holder:
err = error_holder["error"]
if isinstance(err, (ValidationError, ServiceUnavailableError)):
raise err
error_resp = ErrorResponse(
error=str(err),
error_type="execution",
)
yield error_resp.model_dump_json().encode()
return

# Send the JSON response
response = result_holder["response"]
logger.info(
"Code execution completed",
request_id=request_id,
session_id=response.session_id,
)
yield response.model_dump_json().encode()

return StreamingResponse(
_stream_response(),
media_type="application/json",
)

return response
61 changes: 37 additions & 24 deletions src/services/execution/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,23 @@ async def _mount_files_to_sandbox(
files: List[Dict[str, Any]],
language: str = "py",
) -> None:
"""Mount files to sandbox workspace."""
"""Mount files to sandbox workspace.

Uses streaming (MinIO fget_object) to transfer files directly to the
sandbox data directory without loading entire files into memory. This
avoids blocking the asyncio event loop during large file transfers.
"""
try:
from ..file import FileService
from ...config.languages import get_user_id_for_language

file_service = FileService()
user_id = get_user_id_for_language(language)

def _set_file_perms(path, uid):
os.chown(path, uid, uid)
os.chmod(path, 0o644)
return os.path.getsize(path)

for file_info in files:
filename = file_info.get("filename", "unknown")
Expand All @@ -717,36 +729,37 @@ async def _mount_files_to_sandbox(
continue

try:
file_content = await file_service.get_file_content(
session_id, file_id
normalized_filename = OutputProcessor.sanitize_filename(filename)
dest_path = str(sandbox_info.data_dir / normalized_filename)

file_size = file_info.get("size", 0)
if file_size > 10 * 1024 * 1024:
logger.info(
"Mounting large file",
filename=filename,
size_mb=round(file_size / 1024 / 1024, 1),
)

# Stream directly from MinIO to sandbox directory (non-blocking)
success = await file_service.stream_file_to_path(
session_id, file_id, dest_path
)

if file_content is not None:
# Direct memory-to-sandbox transfer (no tempfiles)
normalized_filename = OutputProcessor.sanitize_filename(
filename
if success:
actual_size = await asyncio.to_thread(
_set_file_perms, dest_path, user_id
)
dest_path = f"/mnt/data/{normalized_filename}"

if self.sandbox_manager.copy_content_to_sandbox(
sandbox_info, file_content, dest_path, language=language
):
logger.debug(
"Mounted file",
filename=filename,
size=len(file_content),
)
else:
logger.warning("Failed to mount file", filename=filename)
await self._create_placeholder_file(sandbox_info, filename)
else:
logger.warning(
f"Could not retrieve content for file {filename}"
logger.debug(
"Mounted file",
filename=filename,
size=actual_size,
)
else:
logger.warning("Failed to mount file", filename=filename)
await self._create_placeholder_file(sandbox_info, filename)

except Exception as file_error:
logger.error(f"Error retrieving file {filename}: {file_error}")
logger.error(f"Error mounting file {filename}: {file_error}")
await self._create_placeholder_file(sandbox_info, filename)

except Exception as e:
Expand Down
58 changes: 51 additions & 7 deletions src/services/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,16 +546,17 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
object_key = metadata["object_key"]

try:
# Get object content
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None, self.minio_client.get_object, self.bucket_name, object_key
)

content = response.read()
response.close()
response.release_conn()
def _download():
response = self.minio_client.get_object(self.bucket_name, object_key)
try:
return response.read()
finally:
response.close()
response.release_conn()

content = await loop.run_in_executor(None, _download)
return content

except S3Error as e:
Expand All @@ -567,6 +568,49 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
)
return None

async def stream_file_to_path(
self, session_id: str, file_id: str, dest_path: str
) -> bool:
"""Stream file content from MinIO directly to a local file path.

Uses MinIO's fget_object for efficient disk-to-disk transfer
without loading the entire file into memory. Runs in a thread
pool executor to avoid blocking the async event loop.

Args:
session_id: Session identifier
file_id: File identifier
dest_path: Local filesystem path to write the file to

Returns:
True if successful, False otherwise
"""
metadata = await self.get_file_metadata(session_id, file_id)
if not metadata:
return False

object_key = metadata["object_key"]

try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self.minio_client.fget_object,
self.bucket_name,
object_key,
dest_path,
)
return True
except S3Error as e:
logger.error(
"Failed to stream file to path",
error=str(e),
session_id=session_id,
file_id=file_id,
dest_path=dest_path,
)
return False

async def store_uploaded_file(
self,
session_id: str,
Expand Down
7 changes: 7 additions & 0 deletions src/services/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
"""Get actual file content."""
pass

@abstractmethod
async def stream_file_to_path(
self, session_id: str, file_id: str, dest_path: str
) -> bool:
"""Stream file content directly to a local file path."""
pass

@abstractmethod
async def delete_file(self, session_id: str, file_id: str) -> bool:
"""Delete a file from storage."""
Expand Down
Loading
Loading