From 5d2a77566f0d4f6db8fe449ad3fc0ee0303fe3de Mon Sep 17 00:00:00 2001 From: Steve James Date: Sun, 27 Jul 2025 10:47:52 +0200 Subject: [PATCH 1/3] add custom errors --- Cargo.lock | 3 +- Cargo.toml | 1 + src/lib/http.rs | 43 +++++++++------ src/lib/sandbox.rs | 127 +++++++++++++++++++++++++++++++++------------ src/main.rs | 16 ++---- 5 files changed, 130 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d5e212..bb9564c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,7 +1265,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1651,6 +1651,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "thiserror", "tokio", "tokio-test", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 7810d91..638f345 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ tokio = {version = "1.46.1", features = ["rt-multi-thread"]} uuid = {version = "1.17.0", features = ["v4"]} clap = { version = "4.5", features = ["derive"] } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +thiserror = "2.0.12" [dev-dependencies] tokio-test = "0.4" diff --git a/src/lib/http.rs b/src/lib/http.rs index 753187a..839566c 100644 --- a/src/lib/http.rs +++ b/src/lib/http.rs @@ -44,6 +44,24 @@ pub struct SandboxInfo { pub status: String, } +impl SandboxError { + fn to_status_code(&self) -> StatusCode { + match self { + SandboxError::NotStarted => StatusCode::BAD_REQUEST, + SandboxError::AlreadyStarted => StatusCode::BAD_REQUEST, + SandboxError::SetupCommandsFailed(_) => StatusCode::BAD_REQUEST, + SandboxError::PullImageFailed(_) => StatusCode::BAD_REQUEST, + SandboxError::StopContainerFailed(_) => StatusCode::BAD_REQUEST, + SandboxError::StartContainerFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, + SandboxError::ContainerWriteFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, + SandboxError::ContainerReadFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, + SandboxError::ExecFailed(_, _) => StatusCode::INTERNAL_SERVER_ERROR, + SandboxError::CreateExecFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, + SandboxError::TimeoutWaitingForMarker(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + pub async fn create_sandbox( State(state): State>, Json(payload): Json, @@ -87,11 +105,10 @@ pub async fn start_sandbox( // Now lock the individual sandbox and do long work let mut sandbox_guard = sandbox_arc.lock().await; - // TODO: accurate errors sandbox_guard .start(permit) .await - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; + .map_err(|e| (e.to_status_code(), e.to_string()))?; Ok(()) } @@ -123,11 +140,11 @@ pub async fn exec_cmd( true => sandbox_guard .exec_standalone_cmd(vec![command]) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, + .map_err(|e| (e.to_status_code(), e.to_string()))?, false => sandbox_guard .exec_session_cmd(command) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, + .map_err(|e| (e.to_status_code(), e.to_string()))?, }; Ok(Json(serde_json::json!({ @@ -148,12 +165,12 @@ pub async fn stop_sandbox( .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? }; - sandbox_arc.lock().await.stop().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to stop sandbox {}: {}", id, e), - ) - })?; + sandbox_arc + .lock() + .await + .stop() + .await + .map_err(|e| (e.to_status_code(), e.to_string()))?; // Sandbox_arc drops here, releasing permit if held Ok(()) @@ -173,11 +190,7 @@ pub async fn list_sandboxes( .iter() .map(|sandbox_arc| async { let sandbox = sandbox_arc.lock().await; - let status = if sandbox.container_id.is_some() { - "started" - } else { - "created" - }; + let status = sandbox.status(); SandboxInfo { id: sandbox.id.clone(), image: sandbox.image.clone(), diff --git a/src/lib/sandbox.rs b/src/lib/sandbox.rs index 1072aca..6d6861f 100644 --- a/src/lib/sandbox.rs +++ b/src/lib/sandbox.rs @@ -1,26 +1,70 @@ use std::{pin::Pin, sync::Arc}; -use anyhow::{Result, anyhow}; use bollard::{ - exec::{CreateExecOptions, StartExecOptions, StartExecResults}, query_parameters::RemoveContainerOptions, Docker + Docker, + exec::{CreateExecOptions, StartExecOptions, StartExecResults}, + query_parameters::RemoveContainerOptions, }; use bytes::Bytes; use futures::StreamExt; use futures::channel::mpsc::UnboundedReceiver; use std::sync::atomic::{AtomicU32, Ordering}; +use thiserror::Error; use tokio::sync::Mutex; use tokio::time::{self, Duration, Instant}; use tokio::{io::AsyncWriteExt, sync::OwnedSemaphorePermit}; +type Result = std::result::Result; + +#[derive(Error, Debug)] +pub enum SandboxError { + #[error("Sandbox not started")] + NotStarted, + #[error("Sandbox already started")] + AlreadyStarted, + #[error("Setup commands failed: {0}")] + SetupCommandsFailed(String), + #[error("Failed to pull image: {0}")] + PullImageFailed(String), + #[error("Failed to stop container: {0}")] + StopContainerFailed(String), + #[error("Failed to start container {0}")] + StartContainerFailed(String), + #[error("Container write failed: {0}")] + ContainerWriteFailed(String), + #[error("Container read failed: {0}")] + ContainerReadFailed(String), + #[error("Exec failed: {0} (exit code: {1})")] + ExecFailed(String, i64), + #[error("Failed to create exec: {0}")] + CreateExecFailed(String), + #[error("Timeout waiting for marker: {0}")] + TimeoutWaitingForMarker(String), +} + +pub enum SandboxStatus { + Created, + Started, +} + +impl std::fmt::Display for SandboxStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SandboxStatus::Created => write!(f, "created"), + SandboxStatus::Started => write!(f, "started"), + } + } +} + pub struct Sandbox { pub id: String, pub image: String, pub setup_commands: String, - pub container_id: Option, - pub permit: Option, - pub input: Option>>>, - pub output_receiver: Option>>, pub start_time: Option, + container_id: Option, + permit: Option, + input: Option>>>, + output_receiver: Option>>, command_id: AtomicU32, docker: Arc, } @@ -41,6 +85,14 @@ impl Sandbox { } } + pub fn status(&self) -> SandboxStatus { + if self.container_id.is_some() { + SandboxStatus::Started + } else { + SandboxStatus::Created + } + } + pub async fn stop(&mut self) -> Result<()> { let cid_opt = self.container_id.clone(); @@ -54,9 +106,9 @@ impl Sandbox { ..Default::default() }), ) - .await?; + .await; } else { - return Err(anyhow!("Sandbox not started")); + return Err(SandboxError::NotStarted); } // Sandbox_arc drops here, releasing permit if held @@ -65,7 +117,7 @@ impl Sandbox { pub async fn start(&mut self, permit: OwnedSemaphorePermit) -> Result<()> { if self.container_id.is_some() { - return Err(anyhow!("Sandbox already started")); + return Err(SandboxError::AlreadyStarted); } // Check if image exists locally, pull if it doesn't use bollard::query_parameters::CreateImageOptions; @@ -84,7 +136,11 @@ impl Sandbox { }); let mut pull_stream = self.docker.create_image(pull_options, None, None); - while let Some(_) = pull_stream.try_next().await? { + while let Some(_) = pull_stream + .try_next() + .await + .map_err(|e| SandboxError::PullImageFailed(e.to_string()))? + { // TODO: print progress } } @@ -107,7 +163,7 @@ impl Sandbox { None::, config, ) - .await?; + .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; self.container_id = Some(create_response.id.clone()); @@ -116,12 +172,19 @@ impl Sandbox { &create_response.id, None::, ) - .await?; + .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; if !self.setup_commands.is_empty() { - let (_, _, exit_code) = self.exec_standalone_cmd(self.setup_commands.split_whitespace().map(|s| s.to_string()).collect()).await?; + let (_, stderr, exit_code) = self + .exec_standalone_cmd( + self.setup_commands + .split_whitespace() + .map(|s| s.to_string()) + .collect(), + ) + .await?; if exit_code != 0 { - return Err(anyhow!("Setup commands failed")); + return Err(SandboxError::SetupCommandsFailed(stderr)); } } @@ -137,7 +200,7 @@ impl Sandbox { let attach_res = self .docker .attach_container(&create_response.id, Some(attach_options)) - .await?; + .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; let mut output_stream = attach_res.output; let input = attach_res.input; @@ -167,7 +230,7 @@ impl Sandbox { let mut input_guard = self.input.as_ref().unwrap().lock().await; input_guard .write_all("stty -echo; set +H\n".as_bytes()) - .await?; + .await.map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; } self.drain(0.5).await?; @@ -180,7 +243,7 @@ impl Sandbox { pub async fn exec_session_cmd( &mut self, cmd: String, - ) -> Result<(String, String, i64), anyhow::Error> { + ) -> Result<(String, String, i64)> { let command_id = self.command_id.fetch_add(1, Ordering::Relaxed); let stdout_file = format!("/tmp/stdout_{}.txt", command_id); let stderr_file = format!("/tmp/stderr_{}.txt", command_id); @@ -197,10 +260,10 @@ impl Sandbox { let mut input = self .input .as_ref() - .ok_or(anyhow!("Sandbox not started"))? + .ok_or(SandboxError::NotStarted)? .lock() .await; - input.write_all(cmd_to_send.as_bytes()).await?; + input.write_all(cmd_to_send.as_bytes()).await.map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; } self.read_until_marker(&marker, 20.0).await?; @@ -218,7 +281,7 @@ impl Sandbox { let (combined_output, _, exec_exit_code) = self.exec_standalone_cmd(combined_cmd).await?; if exec_exit_code != 0 { - return Err(anyhow!("Failed to read output files")); + return Err(SandboxError::ExecFailed(combined_output, exec_exit_code)); } // Parse the combined output @@ -251,7 +314,7 @@ impl Sandbox { }; let cid = self.container_id.as_ref().unwrap(); - let exec = self.docker.create_exec(cid, exec_config).await?; + let exec = self.docker.create_exec(cid, exec_config).await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; self.docker .start_exec( @@ -262,7 +325,7 @@ impl Sandbox { ..Default::default() }), ) - .await?; + .await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; self.drain(0.5).await?; @@ -270,7 +333,7 @@ impl Sandbox { } pub async fn exec_standalone_cmd(&self, cmd: Vec) -> Result<(String, String, i64)> { - let cid = self.container_id.as_ref().ok_or(anyhow!("No container"))?; + let cid = self.container_id.as_ref().ok_or(SandboxError::NotStarted)?; let exec_config = CreateExecOptions { cmd: Some(cmd), attach_stdout: Some(true), @@ -279,23 +342,23 @@ impl Sandbox { tty: Some(false), ..Default::default() }; - let exec = self.docker.create_exec(cid, exec_config).await?; + let exec = self.docker.create_exec(cid, exec_config).await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; let start_res = self .docker .start_exec(&exec.id, None::) - .await?; + .await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); if let StartExecResults::Attached { output, .. } = start_res { let mut output = output; while let Some(item) = output.next().await { - match item? { + match item.map_err(|e| SandboxError::ContainerReadFailed(e.to_string()))? { bollard::container::LogOutput::StdOut { message } => stdout.extend(&message), bollard::container::LogOutput::StdErr { message } => stderr.extend(&message), _ => {} } } } - let inspect = self.docker.inspect_exec(&exec.id).await?; + let inspect = self.docker.inspect_exec(&exec.id).await.map_err(|e| SandboxError::ContainerReadFailed(e.to_string()))?; let exit_code = inspect.exit_code.unwrap_or(-1); let stdout_str = String::from_utf8_lossy(&stdout).to_string(); let stderr_str = String::from_utf8_lossy(&stderr).to_string(); @@ -306,7 +369,7 @@ impl Sandbox { let receiver = self .output_receiver .as_ref() - .ok_or(anyhow!("No receiver"))?; + .ok_or(SandboxError::NotStarted)?; let mut receiver = receiver.lock().await; let mut drained: Vec = Vec::new(); loop { @@ -323,20 +386,20 @@ impl Sandbox { let receiver = self .output_receiver .as_ref() - .ok_or(anyhow!("No receiver"))?; + .ok_or(SandboxError::NotStarted)?; let mut receiver = receiver.lock().await; let mut accumulated = String::new(); let start = Instant::now(); while !accumulated.contains(marker) { let elapsed = start.elapsed().as_secs_f64(); if elapsed > timeout { - return Err(anyhow!("Timeout waiting for marker: {}", marker)); + return Err(SandboxError::TimeoutWaitingForMarker(marker.to_string())); } let remaining = timeout - elapsed; match time::timeout(Duration::from_secs_f64(remaining), receiver.next()).await { Ok(Some(chunk)) => accumulated += &String::from_utf8_lossy(&chunk), - Ok(None) => return Err(anyhow!("Stream closed unexpectedly")), - Err(_) => return Err(anyhow!("Timeout waiting for marker: {}", marker)), + Ok(None) => return Err(SandboxError::ContainerReadFailed("Stream closed unexpectedly".to_string())), + Err(_) => return Err(SandboxError::TimeoutWaitingForMarker(marker.to_string())), } } Ok(()) diff --git a/src/main.rs b/src/main.rs index 2c4fe2c..e90b144 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use anyhow::Result; use bollard::Docker; use clap::{Parser, Subcommand}; use sos::http::{AppState, CreatePayload, ExecPayload}; +use sos::sandbox::SandboxStatus; use tokio::sync::{Mutex, Semaphore}; #[derive(Parser)] @@ -152,18 +153,9 @@ async fn serve_command(port: u16, max_sandboxes: usize, timeout: u64) -> Result< }; if let Some(sandbox_arc) = sandbox_arc { - let cid_opt = sandbox_arc.lock().await.container_id.clone(); - if let Some(cid) = cid_opt { - let _ = state_clone - .docker - .remove_container( - &cid, - Some(bollard::query_parameters::RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await; + let mut sandbox = sandbox_arc.lock().await; + if let SandboxStatus::Started = sandbox.status() { + let _ = sandbox.stop().await; } } } From ce2fe68dfd7bd89b3c39bd62e87694a3463c3089 Mon Sep 17 00:00:00 2001 From: Steve James Date: Mon, 28 Jul 2025 10:42:15 +0200 Subject: [PATCH 2/3] add custom errors + store trajectories --- benches/sandbox_performance.rs | 11 +- src/lib/http.rs | 102 +++++++- src/lib/sandbox.rs | 426 +++++++++++++++++++++------------ src/main.rs | 58 ++++- tests/integration_tests.rs | 119 ++++----- 5 files changed, 491 insertions(+), 225 deletions(-) diff --git a/benches/sandbox_performance.rs b/benches/sandbox_performance.rs index 23efd9f..0a785fe 100644 --- a/benches/sandbox_performance.rs +++ b/benches/sandbox_performance.rs @@ -108,7 +108,8 @@ async fn benchmark_sandbox_lifecycle( // Cleanup client - .delete(&format!("{}/sandboxes/{}", base_url, sandbox_id)) + .post(&format!("{}/sandboxes/{}/stop", base_url, sandbox_id)) + .json(&json!({ "remove": true })) .send() .await?; @@ -116,7 +117,7 @@ async fn benchmark_sandbox_lifecycle( } fn sandbox_throughput_benchmark(c: &mut Criterion) { - let grid = tokio::runtime::Runtime::new().unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("sandbox_throughput"); @@ -136,7 +137,7 @@ fn sandbox_throughput_benchmark(c: &mut Criterion) { &(num_sandboxes, semaphore_limit), |b, &(num_sandboxes, semaphore_limit)| { b.iter(|| { - grid.block_on(async { + runtime.block_on(async { benchmark_sandbox_throughput(num_sandboxes, semaphore_limit) .await .unwrap() @@ -150,11 +151,11 @@ fn sandbox_throughput_benchmark(c: &mut Criterion) { } fn sandbox_latency_benchmark(c: &mut Criterion) { - let grid = tokio::runtime::Runtime::new().unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); c.bench_function("single_sandbox_lifecycle", |b| { b.iter(|| { - grid.block_on(async { + runtime.block_on(async { benchmark_sandbox_throughput(1, 1).await.unwrap() }) }); diff --git a/src/lib/http.rs b/src/lib/http.rs index 839566c..77bd16a 100644 --- a/src/lib/http.rs +++ b/src/lib/http.rs @@ -6,13 +6,13 @@ use axum::{ Json, Router, extract::{Path, State}, http::StatusCode, - routing::{delete, post}, + routing::post, }; use bollard::Docker; use futures::future::join_all; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::{Mutex, Semaphore}; +use tokio::{sync::{Mutex, Semaphore}, time::Instant}; use uuid::Uuid; use crate::sandbox::*; @@ -138,7 +138,7 @@ pub async fn exec_cmd( let (stdout, stderr, exit_code) = match standalone { true => sandbox_guard - .exec_standalone_cmd(vec![command]) + .exec_standalone_cmd(command) .await .map_err(|e| (e.to_status_code(), e.to_string()))?, false => sandbox_guard @@ -154,17 +154,32 @@ pub async fn exec_cmd( }))) } +#[derive(Deserialize, serde::Serialize)] +pub struct StopPayload { + pub remove: Option, +} + pub async fn stop_sandbox( Path(id): Path, State(state): State>, + Json(payload): Json, ) -> Result<(), (StatusCode, String)> { let sandbox_arc = { + let remove = payload.remove.unwrap_or(false); let mut sandboxes = state.sandboxes.lock().await; - sandboxes - .remove(&id) - .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? + if remove { + sandboxes + .remove(&id) + .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? + } else { + sandboxes + .get(&id) + .cloned() + .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? + } }; + // Permit is released here sandbox_arc .lock() .await @@ -172,10 +187,71 @@ pub async fn stop_sandbox( .await .map_err(|e| (e.to_status_code(), e.to_string()))?; - // Sandbox_arc drops here, releasing permit if held Ok(()) } +pub async fn get_trajectory( + Path(id): Path, + State(state): State>, +) -> Result, (StatusCode, String)> { + let sandbox_arc = { + let sandboxes = state.sandboxes.lock().await; + sandboxes + .get(&id) + .cloned() + .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? + }; + + let sandbox = sandbox_arc.lock().await; + let trajectory = sandbox.get_trajectory(); + + let start_time = sandbox.start_time.unwrap_or(Instant::now()); + let trajectory_json: Vec = trajectory + .iter() + .enumerate() + .map(|(i, cmd)| { + let timestamp = (cmd.timestamp - start_time).as_secs_f64(); + let mut cmd_json = serde_json::json!({ + "index": i, + "command": cmd.command, + "timestamp": timestamp, + }); + + if let Some(result) = &cmd.result { + cmd_json["result"] = serde_json::json!({ + "stdout": result.stdout, + "stderr": result.stderr, + "exit_code": result.exit_code, + }); + } + + cmd_json + }) + .collect(); + + Ok(Json(serde_json::json!({ + "sandbox_id": id, + "command_count": sandbox.command_count(), + "trajectory": trajectory_json + }))) +} + +pub async fn get_trajectory_formatted( + Path(id): Path, + State(state): State>, +) -> Result { + let sandbox_arc = { + let sandboxes = state.sandboxes.lock().await; + sandboxes + .get(&id) + .cloned() + .ok_or((StatusCode::NOT_FOUND, format!("Sandbox {} not found", id)))? + }; + + let sandbox = sandbox_arc.lock().await; + Ok(sandbox.format_trajectory()) +} + pub async fn list_sandboxes( State(state): State>, ) -> Result>, (StatusCode, String)> { @@ -190,7 +266,7 @@ pub async fn list_sandboxes( .iter() .map(|sandbox_arc| async { let sandbox = sandbox_arc.lock().await; - let status = sandbox.status(); + let status = sandbox.get_status(); SandboxInfo { id: sandbox.id.clone(), image: sandbox.image.clone(), @@ -209,6 +285,14 @@ pub fn create_app(state: Arc) -> Router { .route("/sandboxes", post(create_sandbox).get(list_sandboxes)) .route("/sandboxes/{id}/start", post(start_sandbox)) .route("/sandboxes/{id}/exec", post(exec_cmd)) - .route("/sandboxes/{id}", delete(stop_sandbox)) + .route( + "/sandboxes/{id}/trajectory", + axum::routing::get(get_trajectory), + ) + .route( + "/sandboxes/{id}/trajectory/formatted", + axum::routing::get(get_trajectory_formatted), + ) + .route("/sandboxes/{id}/stop", post(stop_sandbox)) .with_state(state) } diff --git a/src/lib/sandbox.rs b/src/lib/sandbox.rs index 6d6861f..aa291de 100644 --- a/src/lib/sandbox.rs +++ b/src/lib/sandbox.rs @@ -2,12 +2,12 @@ use std::{pin::Pin, sync::Arc}; use bollard::{ Docker, + container::LogOutput, exec::{CreateExecOptions, StartExecOptions, StartExecResults}, query_parameters::RemoveContainerOptions, }; use bytes::Bytes; -use futures::StreamExt; -use futures::channel::mpsc::UnboundedReceiver; +use futures::{StreamExt, channel::mpsc::UnboundedReceiver}; use std::sync::atomic::{AtomicU32, Ordering}; use thiserror::Error; use tokio::sync::Mutex; @@ -16,7 +16,7 @@ use tokio::{io::AsyncWriteExt, sync::OwnedSemaphorePermit}; type Result = std::result::Result; -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum SandboxError { #[error("Sandbox not started")] NotStarted, @@ -42,31 +42,51 @@ pub enum SandboxError { TimeoutWaitingForMarker(String), } +#[derive(Debug, Clone)] pub enum SandboxStatus { Created, - Started, + Started(String), // container id + Stopped(Result<()>), // result of stop } impl std::fmt::Display for SandboxStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SandboxStatus::Created => write!(f, "created"), - SandboxStatus::Started => write!(f, "started"), + SandboxStatus::Started(container_id) => { + write!(f, "started (container id: {})", container_id) + } + SandboxStatus::Stopped(_) => write!(f, "stopped"), } } } +#[derive(Debug, Clone)] +pub struct CommandExecution { + pub command: String, + pub timestamp: Instant, + pub result: Option, +} + +#[derive(Debug, Clone)] +pub struct CommandResult { + pub stdout: String, + pub stderr: String, + pub exit_code: i64, +} + pub struct Sandbox { pub id: String, pub image: String, pub setup_commands: String, pub start_time: Option, - container_id: Option, + status: SandboxStatus, permit: Option, input: Option>>>, output_receiver: Option>>, command_id: AtomicU32, docker: Arc, + trajectory: Vec, } impl Sandbox { @@ -77,57 +97,124 @@ impl Sandbox { setup_commands, docker, command_id: AtomicU32::new(0), - container_id: None, + status: SandboxStatus::Created, permit: None, input: None, output_receiver: None, start_time: None, + trajectory: Vec::new(), } } - pub fn status(&self) -> SandboxStatus { - if self.container_id.is_some() { - SandboxStatus::Started - } else { - SandboxStatus::Created - } + pub fn get_status(&self) -> SandboxStatus { + self.status.clone() } - pub async fn stop(&mut self) -> Result<()> { - let cid_opt = self.container_id.clone(); - - if let Some(cid) = cid_opt { - let _ = self - .docker - .remove_container( - &cid, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await; - } else { - return Err(SandboxError::NotStarted); + /// Get the trajectory of commands executed in this sandbox + pub fn get_trajectory(&self) -> &[CommandExecution] { + &self.trajectory + } + + /// Get the number of commands executed + pub fn command_count(&self) -> usize { + self.trajectory.len() + } + + /// Get the latest command execution + pub fn get_latest_command(&self) -> Option<&CommandExecution> { + self.trajectory.last() + } + + /// Get a specific command by index + pub fn get_command(&self, index: usize) -> Option<&CommandExecution> { + self.trajectory.get(index) + } + + /// Get all successful commands (exit code 0) + pub fn get_successful_commands(&self) -> Vec<&CommandExecution> { + self.trajectory + .iter() + .filter(|cmd| cmd.result.as_ref().map_or(false, |r| r.exit_code == 0)) + .collect() + } + + /// Get all failed commands (exit code != 0) + pub fn get_failed_commands(&self) -> Vec<&CommandExecution> { + self.trajectory + .iter() + .filter(|cmd| cmd.result.as_ref().map_or(true, |r| r.exit_code != 0)) + .collect() + } + + /// Format the trajectory as a human-readable string + pub fn format_trajectory(&self) -> String { + let mut output = String::new(); + for cmd in self.trajectory.iter() { + output.push_str(&format!("$ {}\n", cmd.command)); + + match &cmd.result { + Some(result) => { + if !result.stdout.is_empty() { + output.push_str(&result.stdout); + output.push('\n'); + } + if !result.stderr.is_empty() { + output.push_str(&result.stderr); + output.push('\n'); + } + } + None => { + output.push_str("Status: Command started but no result recorded\n"); + } + } } - // Sandbox_arc drops here, releasing permit if held - Ok(()) + output + } + + pub async fn stop(&mut self) -> Result<()> { + // Release the semaphore + self.permit.take(); + + return match &self.status { + SandboxStatus::Stopped(_) => Ok(()), // Already stopped + SandboxStatus::Created => Err(SandboxError::NotStarted), + SandboxStatus::Started(cid) => { + // Stop the container but don't remove it + let _ = self + .docker + .remove_container( + &cid, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await; + self.status = SandboxStatus::Stopped(Ok(())); + // Close input/output streams + self.input = None; + self.output_receiver = None; + Ok(()) + } + }; } pub async fn start(&mut self, permit: OwnedSemaphorePermit) -> Result<()> { - if self.container_id.is_some() { - return Err(SandboxError::AlreadyStarted); - } - // Check if image exists locally, pull if it doesn't - use bollard::query_parameters::CreateImageOptions; + use bollard::query_parameters::{ + AttachContainerOptions, CreateContainerOptions, CreateImageOptions, + StartContainerOptions, + }; use futures::TryStreamExt; + match &self.status { + SandboxStatus::Created => (), + _ => return Err(SandboxError::AlreadyStarted), + }; + // First, try to inspect the image to see if it exists locally match self.docker.inspect_image(&self.image).await { - Ok(_) => { - // Image exists locally, no need to pull - } + Ok(_) => {} Err(_) => { // Image doesn't exist locally, pull it let pull_options = Some(CreateImageOptions { @@ -159,36 +246,25 @@ impl Sandbox { let create_response = self .docker - .create_container( - None::, - config, - ) - .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; + .create_container(None::, config) + .await + .map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; - self.container_id = Some(create_response.id.clone()); + self.status = SandboxStatus::Started(create_response.id.clone()); self.docker - .start_container( - &create_response.id, - None::, - ) - .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; + .start_container(&create_response.id, None::) + .await + .map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; if !self.setup_commands.is_empty() { - let (_, stderr, exit_code) = self - .exec_standalone_cmd( - self.setup_commands - .split_whitespace() - .map(|s| s.to_string()) - .collect(), - ) - .await?; + let (_, stderr, exit_code) = self.exec_standalone_cmd(self.setup_commands.clone()).await?; if exit_code != 0 { return Err(SandboxError::SetupCommandsFailed(stderr)); } } - let attach_options = bollard::query_parameters::AttachContainerOptions { + let attach_options = AttachContainerOptions { stdin: true, stdout: true, stderr: true, @@ -200,7 +276,8 @@ impl Sandbox { let attach_res = self .docker .attach_container(&create_response.id, Some(attach_options)) - .await.map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; + .await + .map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; let mut output_stream = attach_res.output; let input = attach_res.input; @@ -210,8 +287,8 @@ impl Sandbox { while let Some(res) = output_stream.next().await { if let Ok(chunk) = res { let bytes = match chunk { - bollard::container::LogOutput::StdOut { message } => message, - bollard::container::LogOutput::StdErr { message } => message, + LogOutput::StdOut { message } => message, + LogOutput::StdErr { message } => message, _ => continue, }; let _ = tx.unbounded_send(bytes); @@ -230,7 +307,8 @@ impl Sandbox { let mut input_guard = self.input.as_ref().unwrap().lock().await; input_guard .write_all("stty -echo; set +H\n".as_bytes()) - .await.map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; + .await + .map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; } self.drain(0.5).await?; @@ -240,113 +318,151 @@ impl Sandbox { Ok(()) } - pub async fn exec_session_cmd( - &mut self, - cmd: String, - ) -> Result<(String, String, i64)> { - let command_id = self.command_id.fetch_add(1, Ordering::Relaxed); - let stdout_file = format!("/tmp/stdout_{}.txt", command_id); - let stderr_file = format!("/tmp/stderr_{}.txt", command_id); - let exitcode_file = format!("/tmp/exitcode_{}.txt", command_id); - let marker = format!("COMMAND_DONE_{}", command_id); - - let grouped_command = format!("{{ {} ; }}", cmd); - let cmd_to_send = format!( - "{} > {} 2> {}; echo $? > {}; echo '{}'\n", - grouped_command, stdout_file, stderr_file, exitcode_file, marker - ); - - { - let mut input = self - .input - .as_ref() - .ok_or(SandboxError::NotStarted)? - .lock() - .await; - input.write_all(cmd_to_send.as_bytes()).await.map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; - } - - self.read_until_marker(&marker, 20.0).await?; + pub async fn exec_session_cmd(&mut self, cmd: String) -> Result<(String, String, i64)> { + match self.status.clone() { + SandboxStatus::Started(cid) => { + // Record the command with timestamp + let execution_start = Instant::now(); + let mut command_execution = CommandExecution { + command: cmd.clone(), + timestamp: execution_start, + result: None, + }; + + let command_id = self.command_id.fetch_add(1, Ordering::Relaxed); + let stdout_file = format!("/tmp/stdout_{}.txt", command_id); + let stderr_file = format!("/tmp/stderr_{}.txt", command_id); + let exitcode_file = format!("/tmp/exitcode_{}.txt", command_id); + let marker = format!("COMMAND_DONE_{}", command_id); + + let grouped_command = format!("{{ {} ; }}", cmd); + let cmd_to_send = format!( + "{} > {} 2> {}; echo $? > {}; echo '{}'\n", + grouped_command, stdout_file, stderr_file, exitcode_file, marker + ); - // Read all three files in a single exec command with delimiters - let combined_cmd = vec![ - "/bin/bash".to_string(), - "-c".to_string(), - format!( - "echo 'STDOUT_START'; cat {}; echo 'STDOUT_END'; echo 'STDERR_START'; cat {}; echo 'STDERR_END'; echo 'EXITCODE_START'; cat {}; echo 'EXITCODE_END'", - stdout_file, stderr_file, exitcode_file - ), - ]; + { + let mut input = self + .input + .as_ref() + .ok_or(SandboxError::NotStarted)? + .lock() + .await; + input + .write_all(cmd_to_send.as_bytes()) + .await + .map_err(|e| SandboxError::ContainerWriteFailed(e.to_string()))?; + } - let (combined_output, _, exec_exit_code) = self.exec_standalone_cmd(combined_cmd).await?; + self.read_until_marker(&marker, 20.0).await?; - if exec_exit_code != 0 { - return Err(SandboxError::ExecFailed(combined_output, exec_exit_code)); - } + // Read all three files in a single exec command with delimiters + let combined_cmd = + format!( + "echo 'STDOUT_START'; cat {}; echo 'STDOUT_END'; echo 'STDERR_START'; cat {}; echo 'STDERR_END'; echo 'EXITCODE_START'; cat {}; echo 'EXITCODE_END'", + stdout_file, stderr_file, exitcode_file + ); - // Parse the combined output - let stdout = extract_section(&combined_output, "STDOUT_START", "STDOUT_END") - .trim_end_matches('\n') - .to_string(); - let stderr = extract_section(&combined_output, "STDERR_START", "STDERR_END") - .trim_end_matches('\n') - .to_string(); - let exit_code_str = extract_section(&combined_output, "EXITCODE_START", "EXITCODE_END") - .trim() - .to_string(); - - // Since we're non-tty, stderr is prefixed by `bash: `, so we need to remove that - let stderr = stderr.trim_start_matches("bash: ").to_string(); - - // Clean up files - let clean_cmd = vec![ - "rm".to_string(), - stdout_file.clone(), - stderr_file.clone(), - exitcode_file.clone(), - ]; - let exec_config = CreateExecOptions { - cmd: Some(clean_cmd), - attach_stdout: Some(false), - attach_stderr: Some(false), - attach_stdin: Some(false), - ..Default::default() - }; + let (combined_output, _, exec_exit_code) = + self.exec_standalone_cmd(combined_cmd).await?; - let cid = self.container_id.as_ref().unwrap(); - let exec = self.docker.create_exec(cid, exec_config).await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; + if exec_exit_code != 0 { + return Err(SandboxError::ExecFailed(combined_output, exec_exit_code)); + } - self.docker - .start_exec( - &exec.id, - Some(StartExecOptions { - detach: true, - tty: false, + // Parse the combined output + let stdout = extract_section(&combined_output, "STDOUT_START", "STDOUT_END") + .trim_end_matches('\n') + .to_string(); + let stderr = extract_section(&combined_output, "STDERR_START", "STDERR_END") + .trim_end_matches('\n') + .to_string(); + let exit_code_str = + extract_section(&combined_output, "EXITCODE_START", "EXITCODE_END") + .trim() + .to_string(); + + // Since we're non-tty, stderr is prefixed by `bash: `, so we need to remove that + let stderr = stderr.trim_start_matches("bash: ").to_string(); + + let exit_code = exit_code_str.parse::().unwrap_or(-1); + + // Store the result in trajectory + command_execution.result = Some(CommandResult { + stdout: stdout.clone(), + stderr: stderr.clone(), + exit_code, + }); + self.trajectory.push(command_execution); + + // Clean up files + let clean_cmd = vec![ + "rm".to_string(), + stdout_file.clone(), + stderr_file.clone(), + exitcode_file.clone(), + ]; + let exec_config = CreateExecOptions { + cmd: Some(clean_cmd), + attach_stdout: Some(false), + attach_stderr: Some(false), + attach_stdin: Some(false), ..Default::default() - }), - ) - .await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; + }; - self.drain(0.5).await?; + let exec = self + .docker + .create_exec(&cid, exec_config) + .await + .map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; + + self.docker + .start_exec( + &exec.id, + Some(StartExecOptions { + detach: true, + tty: false, + ..Default::default() + }), + ) + .await + .map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; - return Ok((stdout, stderr, exit_code_str.parse::().unwrap_or(-1))); + self.drain(0.5).await?; + + return Ok((stdout, stderr, exit_code)); + } + _ => return Err(SandboxError::NotStarted), + } } - pub async fn exec_standalone_cmd(&self, cmd: Vec) -> Result<(String, String, i64)> { - let cid = self.container_id.as_ref().ok_or(SandboxError::NotStarted)?; + pub async fn exec_standalone_cmd(&self, cmd: String) -> Result<(String, String, i64)> { + let cid = match &self.status { + SandboxStatus::Started(cid) => cid, + _ => return Err(SandboxError::NotStarted), + }; let exec_config = CreateExecOptions { - cmd: Some(cmd), + cmd: Some(vec![ + "/bin/bash".to_string(), + "-c".to_string(), + cmd, + ]), attach_stdout: Some(true), attach_stderr: Some(true), attach_stdin: Some(false), tty: Some(false), ..Default::default() }; - let exec = self.docker.create_exec(cid, exec_config).await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; + let exec = self + .docker + .create_exec(cid, exec_config) + .await + .map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; let start_res = self .docker .start_exec(&exec.id, None::) - .await.map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; + .await + .map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); if let StartExecResults::Attached { output, .. } = start_res { let mut output = output; @@ -358,7 +474,11 @@ impl Sandbox { } } } - let inspect = self.docker.inspect_exec(&exec.id).await.map_err(|e| SandboxError::ContainerReadFailed(e.to_string()))?; + let inspect = self + .docker + .inspect_exec(&exec.id) + .await + .map_err(|e| SandboxError::ContainerReadFailed(e.to_string()))?; let exit_code = inspect.exit_code.unwrap_or(-1); let stdout_str = String::from_utf8_lossy(&stdout).to_string(); let stderr_str = String::from_utf8_lossy(&stderr).to_string(); @@ -398,7 +518,11 @@ impl Sandbox { let remaining = timeout - elapsed; match time::timeout(Duration::from_secs_f64(remaining), receiver.next()).await { Ok(Some(chunk)) => accumulated += &String::from_utf8_lossy(&chunk), - Ok(None) => return Err(SandboxError::ContainerReadFailed("Stream closed unexpectedly".to_string())), + Ok(None) => { + return Err(SandboxError::ContainerReadFailed( + "Stream closed unexpectedly".to_string(), + )); + } Err(_) => return Err(SandboxError::TimeoutWaitingForMarker(marker.to_string())), } } diff --git a/src/main.rs b/src/main.rs index e90b144..a3426a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Result; use bollard::Docker; use clap::{Parser, Subcommand}; -use sos::http::{AppState, CreatePayload, ExecPayload}; +use sos::http::{AppState, CreatePayload, ExecPayload, StopPayload}; use sos::sandbox::SandboxStatus; use tokio::sync::{Mutex, Semaphore}; @@ -87,6 +87,16 @@ enum SandboxCommands { Stop { /// Sandbox ID id: String, + #[arg(short, long, default_value = "false")] + remove: Option, + }, + /// View the command trajectory of a sandbox + Trajectory { + /// Sandbox ID + id: String, + /// Whether to format output as human-readable text + #[arg(short, long, default_value = "false")] + formatted: bool, }, } @@ -154,7 +164,7 @@ async fn serve_command(port: u16, max_sandboxes: usize, timeout: u64) -> Result< if let Some(sandbox_arc) = sandbox_arc { let mut sandbox = sandbox_arc.lock().await; - if let SandboxStatus::Started = sandbox.status() { + if let SandboxStatus::Started(_) = sandbox.get_status() { let _ = sandbox.stop().await; } } @@ -299,22 +309,57 @@ async fn sandbox_command(server: String, action: SandboxCommands) -> Result<()> std::process::exit(1); } } - SandboxCommands::Stop { id } => { + SandboxCommands::Stop { id, remove } => { println!("Stopping sandbox: {}", id); let response = client - .delete(&format!("{}/sandboxes/{}", server, id)) + .post(&format!("{}/sandboxes/{}/stop", server, id)) + .json(&StopPayload { remove }) .send() .await?; if response.status().is_success() { - println!("✓ Sandbox {} stopped and removed", id); + println!("✓ Sandbox {} stopped", id); + println!(" Use 'sos trajectory {}' to view command history", id); } else { let error = response.text().await?; eprintln!("✗ Failed to stop sandbox: {}", error); std::process::exit(1); } } + SandboxCommands::Trajectory { id, formatted } => { + println!("Viewing trajectory for sandbox: {}", id); + + if formatted { + let response = client + .get(&format!("{}/sandboxes/{}/trajectory/formatted", server, id)) + .send() + .await?; + + if response.status().is_success() { + let formatted_trajectory = response.text().await?; + println!("{}", formatted_trajectory); + } else { + let error = response.text().await?; + eprintln!("✗ Failed to get trajectory: {}", error); + std::process::exit(1); + } + } else { + let response = client + .get(&format!("{}/sandboxes/{}/trajectory", server, id)) + .send() + .await?; + + if response.status().is_success() { + let trajectory_data: serde_json::Value = response.json().await?; + println!("{}", serde_json::to_string_pretty(&trajectory_data)?); + } else { + let error = response.text().await?; + eprintln!("✗ Failed to get trajectory: {}", error); + std::process::exit(1); + } + } + } } Ok(()) @@ -424,7 +469,8 @@ async fn session_command(server: String, image: String, setup: Vec) -> R // Clean up the sandbox println!("Stopping and removing sandbox..."); let response = client - .delete(&format!("{}/sandboxes/{}", server, id)) + .post(&format!("{}/sandboxes/{}/stop", server, id)) + .json(&StopPayload { remove: Some(true) }) .send() .await?; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 809731a..69cbd8c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::sync::Arc; use bollard::Docker; -use sos::http::{AppState, create_app}; +use futures::future; use serde_json::json; +use sos::http::{AppState, create_app}; use tokio::sync::{Mutex, Semaphore}; -use tokio::time::{Duration, sleep, Instant}; -use futures::future; +use tokio::time::{Duration, Instant, sleep}; async fn start_test_server() -> String { let semaphore = Arc::new(Semaphore::new(10)); @@ -108,10 +108,7 @@ async fn test_sandbox_endpoints_flow() { exec_result["stderr"], "cd: not-exists: No such file or directory", "Stderr should not be empty" ); - assert_eq!( - exec_result["exit_code"], 1, - "Exit code should be 1" - ); + assert_eq!(exec_result["exit_code"], 1, "Exit code should be 1"); println!("Executed command successfully: {:?}", exec_result); // Test 4: Execute comment (should be ignored) @@ -142,7 +139,7 @@ async fn test_sandbox_endpoints_flow() { "Comment should return empty stdout" ); println!("Comment command handled correctly"); - + // Test 5: Make sure session is persisted println!("Testing session persistence..."); let exec_payload = json!({ @@ -179,10 +176,7 @@ async fn test_sandbox_endpoints_flow() { exec_result["exit_code"], 0, "Comment should return exit code 0" ); - assert_eq!( - exec_result["stdout"], "/tmp", - "Stdout should be '/tmp'" - ); + assert_eq!(exec_result["stdout"], "/tmp", "Stdout should be '/tmp'"); // Test 6: Test standalone mode println!("Testing standalone mode..."); @@ -208,11 +202,7 @@ async fn test_sandbox_endpoints_flow() { exec_result["exit_code"], 0, "Comment should return exit code 0" ); - assert_eq!( - exec_result["stdout"], "/\n", - "Stdout should be '/\n'" - ); - + assert_eq!(exec_result["stdout"], "/\n", "Stdout should be '/\n'"); // Test 7: Make sure piping works println!("Testing piping..."); @@ -242,11 +232,11 @@ async fn test_sandbox_endpoints_flow() { "Piping should return exit code 0" ); - // Test 8: Stop sandbox println!("Testing stop sandbox..."); let response = client - .delete(&format!("{}/sandboxes/{}", base_url, sandbox_id)) + .post(&format!("{}/sandboxes/{}/stop", base_url, sandbox_id)) + .json(&json!({ "remove": true })) .send() .await .expect("Failed to send stop request"); @@ -314,7 +304,8 @@ async fn test_error_conditions() { // Test 3: Stop non-existent sandbox println!("Testing stop non-existent sandbox..."); let response = client - .delete(&format!("{}/sandboxes/{}", base_url, fake_id)) + .post(&format!("{}/sandboxes/{}/stop", base_url, fake_id)) + .json(&json!({ "remove": true })) .send() .await .expect("Failed to send stop request"); @@ -376,7 +367,8 @@ async fn test_double_start_sandbox() { // Clean up client - .delete(&format!("{}/sandboxes/{}", base_url, sandbox_id)) + .post(&format!("{}/sandboxes/{}/stop", base_url, sandbox_id)) + .json(&json!({ "remove": true })) .send() .await .expect("Failed to clean up sandbox"); @@ -388,7 +380,7 @@ async fn test_double_start_sandbox() { #[ignore] async fn test_semaphore_fuzz() { println!("Testing semaphore with 8 concurrent sandboxes and limit of 3..."); - + // Create test server with semaphore limit of 3 (smaller for faster testing) let semaphore = Arc::new(Semaphore::new(3)); let state = Arc::new(AppState { @@ -416,7 +408,7 @@ async fn test_semaphore_fuzz() { sleep(Duration::from_millis(100)).await; let client = reqwest::Client::new(); - + // Create 8 sandboxes (reduced for faster testing) println!("Creating 8 sandboxes..."); let mut sandbox_ids = Vec::new(); @@ -434,7 +426,7 @@ async fn test_semaphore_fuzz() { .expect("Failed to create sandbox"); assert_eq!(response.status(), 200); - + let create_result: serde_json::Value = response .json() .await @@ -447,7 +439,7 @@ async fn test_semaphore_fuzz() { // Run complete trajectories (start → exec → cleanup) concurrently println!("Running 8 complete sandbox trajectories concurrently (semaphore limit: 3)..."); let start_time = Instant::now(); - + let trajectory_tasks: Vec<_> = sandbox_ids .iter() .enumerate() @@ -457,22 +449,26 @@ async fn test_semaphore_fuzz() { let sandbox_id = sandbox_id.clone(); tokio::spawn(async move { let task_start = Instant::now(); - + // Start sandbox let start_response = client .post(&format!("{}/sandboxes/{}/start", base_url, sandbox_id)) .send() .await .expect("Failed to send start request"); - + if start_response.status() != 200 { - println!("Sandbox {} start failed with status: {}", i, start_response.status()); + println!( + "Sandbox {} start failed with status: {}", + i, + start_response.status() + ); return (i, false, task_start.elapsed()); } - + let start_duration = task_start.elapsed(); println!("Sandbox {} started in {:?}", i, start_duration); - + // Execute command let exec_payload = json!({ "command": format!("echo 'Hello from sandbox {}'", i) @@ -484,82 +480,97 @@ async fn test_semaphore_fuzz() { .send() .await .expect("Failed to send exec request"); - + if exec_response.status() != 200 { - println!("Sandbox {} exec failed with status: {}", i, exec_response.status()); + println!( + "Sandbox {} exec failed with status: {}", + i, + exec_response.status() + ); } else { println!("Sandbox {} executed command successfully", i); } - + // Clean up let cleanup_response = client - .delete(&format!("{}/sandboxes/{}", base_url, sandbox_id)) + .post(&format!("{}/sandboxes/{}/stop", base_url, sandbox_id)) + .json(&json!({ "remove": true })) .send() .await .expect("Failed to send cleanup request"); - + let total_duration = task_start.elapsed(); - println!("Sandbox {} complete trajectory finished in {:?}", i, total_duration); - - let success = start_response.status() == 200 && - exec_response.status() == 200 && - cleanup_response.status() == 200; - + println!( + "Sandbox {} complete trajectory finished in {:?}", + i, total_duration + ); + + let success = start_response.status() == 200 + && exec_response.status() == 200 + && cleanup_response.status() == 200; + (i, success, total_duration) }) }) .collect(); // Wait for all trajectories to complete - let mut results = future::join_all(trajectory_tasks).await + let mut results = future::join_all(trajectory_tasks) + .await .into_iter() .collect::, _>>() .expect("Some tasks failed"); - + let total_duration = start_time.elapsed(); println!("All trajectories completed in {:?}", total_duration); // Verify all trajectories succeeded let successful_trajectories = results.iter().filter(|(_, success, _)| *success).count(); - assert_eq!(successful_trajectories, 8, "All 8 trajectories should succeed"); + assert_eq!( + successful_trajectories, 8, + "All 8 trajectories should succeed" + ); // Analyze timing to ensure semaphore is working results.sort_by_key(|(_, _, duration)| *duration); println!("Trajectory durations (sorted):"); for (i, (sandbox_idx, success, duration)) in results.iter().enumerate() { - println!(" #{}: Sandbox {} - {} - {:?}", - i + 1, sandbox_idx, - if *success { "SUCCESS" } else { "FAILED" }, - duration); + println!( + " #{}: Sandbox {} - {} - {:?}", + i + 1, + sandbox_idx, + if *success { "SUCCESS" } else { "FAILED" }, + duration + ); } // Analyze the timing patterns to detect semaphore behavior let durations: Vec = results.iter().map(|(_, _, d)| d.as_millis()).collect(); - + println!("Timing analysis:"); println!(" First batch (1-3): {:?}ms", &durations[0..3]); println!(" Second batch (4-6): {:?}ms", &durations[3..6]); println!(" Third batch (7-8): {:?}ms", &durations[6..8]); - + // With proper semaphore behavior, we should see clear timing differences // The first 3 should complete first, then the next batch should start let first_3_max = durations[2]; let next_3_min = durations[3]; - + if durations.len() >= 6 && next_3_min > first_3_max { println!("✓ Clear semaphore batching detected - batch separation visible"); } else { println!("⚠ Batching not clearly visible, but semaphore may still be working"); } - + // Count how many completed quickly vs slowly let fast_trajectories = durations.iter().filter(|&&d| d < 5000).count(); // < 5 seconds let slow_trajectories = durations.iter().filter(|&&d| d >= 5000).count(); // >= 5 seconds - + println!("Speed distribution:"); println!(" Fast trajectories (< 5s): {}", fast_trajectories); println!(" Slow trajectories (>= 5s): {}", slow_trajectories); - + println!("✓ All trajectories completed successfully!"); println!("✓ Semaphore correctly limited concurrent sandbox starts to 3"); From 7264002e65bfa49efd29609575b6817c732acb23 Mon Sep 17 00:00:00 2001 From: Steve James Date: Mon, 28 Jul 2025 11:02:04 +0200 Subject: [PATCH 3/3] fix standalone commands + handle comments in sandbox --- src/lib/http.rs | 11 ++++---- src/lib/sandbox.rs | 63 ++++++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/src/lib/http.rs b/src/lib/http.rs index 77bd16a..06d98e8 100644 --- a/src/lib/http.rs +++ b/src/lib/http.rs @@ -119,11 +119,6 @@ pub async fn exec_cmd( Json(payload): Json, ) -> Result, (StatusCode, String)> { let command = payload.command; - if command.trim_start().starts_with('#') { - return Ok(Json( - serde_json::json!({ "stdout": "", "stderr": "", "exit_code": 0 }), - )); - } let sandbox_arc = { let sandboxes = state.sandboxes.lock().await; @@ -136,7 +131,11 @@ pub async fn exec_cmd( let mut sandbox_guard = sandbox_arc.lock().await; let standalone = payload.standalone.unwrap_or(false); - let (stdout, stderr, exit_code) = match standalone { + let CommandResult { + stdout, + stderr, + exit_code, + } = match standalone { true => sandbox_guard .exec_standalone_cmd(command) .await diff --git a/src/lib/sandbox.rs b/src/lib/sandbox.rs index aa291de..48cf0e3 100644 --- a/src/lib/sandbox.rs +++ b/src/lib/sandbox.rs @@ -258,7 +258,13 @@ impl Sandbox { .map_err(|e| SandboxError::StartContainerFailed(e.to_string()))?; if !self.setup_commands.is_empty() { - let (_, stderr, exit_code) = self.exec_standalone_cmd(self.setup_commands.clone()).await?; + let CommandResult { + stderr, + exit_code, + stdout: _, + } = self + .exec_standalone_cmd(self.setup_commands.clone()) + .await?; if exit_code != 0 { return Err(SandboxError::SetupCommandsFailed(stderr)); } @@ -318,7 +324,7 @@ impl Sandbox { Ok(()) } - pub async fn exec_session_cmd(&mut self, cmd: String) -> Result<(String, String, i64)> { + pub async fn exec_session_cmd(&mut self, cmd: String) -> Result { match self.status.clone() { SandboxStatus::Started(cid) => { // Record the command with timestamp @@ -329,6 +335,17 @@ impl Sandbox { result: None, }; + if command_execution.command.trim_start().starts_with('#') { + let result = CommandResult { + stdout: "".to_string(), + stderr: "".to_string(), + exit_code: 0, + }; + command_execution.result = Some(result.clone()); + self.trajectory.push(command_execution); + return Ok(result); + } + let command_id = self.command_id.fetch_add(1, Ordering::Relaxed); let stdout_file = format!("/tmp/stdout_{}.txt", command_id); let stderr_file = format!("/tmp/stderr_{}.txt", command_id); @@ -357,14 +374,16 @@ impl Sandbox { self.read_until_marker(&marker, 20.0).await?; // Read all three files in a single exec command with delimiters - let combined_cmd = - format!( - "echo 'STDOUT_START'; cat {}; echo 'STDOUT_END'; echo 'STDERR_START'; cat {}; echo 'STDERR_END'; echo 'EXITCODE_START'; cat {}; echo 'EXITCODE_END'", - stdout_file, stderr_file, exitcode_file - ); + let combined_cmd = format!( + "echo 'STDOUT_START'; cat {}; echo 'STDOUT_END'; echo 'STDERR_START'; cat {}; echo 'STDERR_END'; echo 'EXITCODE_START'; cat {}; echo 'EXITCODE_END'", + stdout_file, stderr_file, exitcode_file + ); - let (combined_output, _, exec_exit_code) = - self.exec_standalone_cmd(combined_cmd).await?; + let CommandResult { + stdout: combined_output, + stderr: _, + exit_code: exec_exit_code, + } = self.exec_standalone_cmd(combined_cmd).await?; if exec_exit_code != 0 { return Err(SandboxError::ExecFailed(combined_output, exec_exit_code)); @@ -388,11 +407,12 @@ impl Sandbox { let exit_code = exit_code_str.parse::().unwrap_or(-1); // Store the result in trajectory - command_execution.result = Some(CommandResult { - stdout: stdout.clone(), - stderr: stderr.clone(), + let result = CommandResult { + stdout, + stderr, exit_code, - }); + }; + command_execution.result = Some(result.clone()); self.trajectory.push(command_execution); // Clean up files @@ -429,24 +449,21 @@ impl Sandbox { .map_err(|e| SandboxError::CreateExecFailed(e.to_string()))?; self.drain(0.5).await?; + - return Ok((stdout, stderr, exit_code)); + return Ok(result); } _ => return Err(SandboxError::NotStarted), } } - pub async fn exec_standalone_cmd(&self, cmd: String) -> Result<(String, String, i64)> { + pub async fn exec_standalone_cmd(&self, cmd: String) -> Result { let cid = match &self.status { SandboxStatus::Started(cid) => cid, _ => return Err(SandboxError::NotStarted), }; let exec_config = CreateExecOptions { - cmd: Some(vec![ - "/bin/bash".to_string(), - "-c".to_string(), - cmd, - ]), + cmd: Some(vec!["/bin/bash".to_string(), "-c".to_string(), cmd]), attach_stdout: Some(true), attach_stderr: Some(true), attach_stdin: Some(false), @@ -482,7 +499,11 @@ impl Sandbox { let exit_code = inspect.exit_code.unwrap_or(-1); let stdout_str = String::from_utf8_lossy(&stdout).to_string(); let stderr_str = String::from_utf8_lossy(&stderr).to_string(); - Ok((stdout_str, stderr_str, exit_code)) + Ok(CommandResult { + stdout: stdout_str, + stderr: stderr_str, + exit_code, + }) } pub async fn drain(&mut self, timeout: f64) -> Result {