diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f35402efd08e..c10fdddab964 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2900,6 +2900,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "sha2 0.10.9", "tempfile", "test-case", "thiserror 2.0.18", @@ -2911,6 +2912,7 @@ dependencies = [ "uuid", "windows-sys 0.52.0", "wiremock", + "zeroize", ] [[package]] diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 31fc4ea0aa9e..d3b12b654b20 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -33,6 +33,8 @@ reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } prost = "0.14.3" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +sha2 = { workspace = true } +zeroize = { workspace = true } thiserror = { workspace = true } toml = { workspace = true } tokio = { workspace = true, features = [ diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 88c0c8034288..c568d5ddac8b 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -48,6 +48,9 @@ use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; +use crate::protocol::FILE_TRANSFER_CANCEL_METHOD; +use crate::protocol::FILE_TRANSFER_PREPARE_UPLOAD_METHOD; +use crate::protocol::FILE_TRANSFER_STATUS_METHOD; use crate::protocol::FS_CANONICALIZE_METHOD; use crate::protocol::FS_CLOSE_METHOD; use crate::protocol::FS_COPY_METHOD; @@ -59,6 +62,12 @@ use crate::protocol::FS_READ_DIRECTORY_METHOD; use crate::protocol::FS_READ_FILE_METHOD; use crate::protocol::FS_REMOVE_METHOD; use crate::protocol::FS_WRITE_FILE_METHOD; +use crate::protocol::FileTransferCancelParams; +use crate::protocol::FileTransferCancelResponse; +use crate::protocol::FileTransferPrepareUploadParams; +use crate::protocol::FileTransferPrepareUploadResponse; +use crate::protocol::FileTransferStatusParams; +use crate::protocol::FileTransferStatusResponse; use crate::protocol::FsCanonicalizeParams; use crate::protocol::FsCanonicalizeResponse; use crate::protocol::FsCloseParams; @@ -406,6 +415,27 @@ impl LazyRemoteExecServerClient { pub(crate) async fn environment_info(&self) -> Result { self.get().await?.environment_info().await } + + pub(crate) async fn file_transfer_prepare_upload( + &self, + params: FileTransferPrepareUploadParams, + ) -> Result { + self.get().await?.file_transfer_prepare_upload(params).await + } + + pub(crate) async fn file_transfer_status( + &self, + transfer_id: String, + ) -> Result { + self.get().await?.file_transfer_status(transfer_id).await + } + + pub(crate) async fn file_transfer_cancel( + &self, + transfer_id: String, + ) -> Result { + self.get().await?.file_transfer_cancel(transfer_id).await + } } #[derive(Debug, thiserror::Error)] @@ -509,6 +539,36 @@ impl ExecServerClient { self.call(ENVIRONMENT_INFO_METHOD, &()).await } + pub async fn file_transfer_prepare_upload( + &self, + params: FileTransferPrepareUploadParams, + ) -> Result { + self.call(FILE_TRANSFER_PREPARE_UPLOAD_METHOD, ¶ms) + .await + } + + pub async fn file_transfer_status( + &self, + transfer_id: String, + ) -> Result { + self.call( + FILE_TRANSFER_STATUS_METHOD, + &FileTransferStatusParams { transfer_id }, + ) + .await + } + + pub async fn file_transfer_cancel( + &self, + transfer_id: String, + ) -> Result { + self.call( + FILE_TRANSFER_CANCEL_METHOD, + &FileTransferCancelParams { transfer_id }, + ) + .await + } + pub async fn read(&self, params: ReadParams) -> Result { self.call(EXEC_READ_METHOD, ¶ms).await } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 4ac84f9663ca..1c7a2dcae7ef 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -24,6 +24,10 @@ use crate::local_process::LocalProcess; use crate::process::ExecBackend; use crate::protocol::EnvironmentCapabilities; use crate::protocol::EnvironmentInfo; +use crate::protocol::FileTransferCancelResponse; +use crate::protocol::FileTransferPrepareUploadParams; +use crate::protocol::FileTransferPrepareUploadResponse; +use crate::protocol::FileTransferStatusResponse; use crate::protocol::ShellInfo; use crate::remote::NoiseRendezvousEnvironmentConfig; use crate::remote_file_system::RemoteFileSystem; @@ -562,6 +566,42 @@ impl Environment { } } + pub async fn file_transfer_prepare_upload( + &self, + params: FileTransferPrepareUploadParams, + ) -> Result { + let client = self.remote_client.as_ref().ok_or_else(|| { + ExecServerError::Protocol( + "executor-owned file transfer requires a remote environment".to_string(), + ) + })?; + client.file_transfer_prepare_upload(params).await + } + + pub async fn file_transfer_status( + &self, + transfer_id: String, + ) -> Result { + let client = self.remote_client.as_ref().ok_or_else(|| { + ExecServerError::Protocol( + "executor-owned file transfer requires a remote environment".to_string(), + ) + })?; + client.file_transfer_status(transfer_id).await + } + + pub async fn file_transfer_cancel( + &self, + transfer_id: String, + ) -> Result { + let client = self.remote_client.as_ref().ok_or_else(|| { + ExecServerError::Protocol( + "executor-owned file transfer requires a remote environment".to_string(), + ) + })?; + client.file_transfer_cancel(transfer_id).await + } + /// Starts connecting a remote environment without waiting for it. /// Requires an active Tokio runtime when background startup is supported. pub fn start_connecting(&self) { diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 6e3c5830ab1c..0d3eb1b0e8a9 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -149,6 +149,7 @@ pub use protocol::WriteResponse; pub use protocol::WriteStatus; pub use remote::RemoteEnvironmentConfig; pub use remote::run_remote_environment; +pub use rpc::FILE_TRANSFER_SESSION_LOST_ERROR_CODE; pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 31dde7cf494a..0eb48696db09 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -35,6 +35,9 @@ pub const FS_COPY_METHOD: &str = "fs/copy"; pub const HTTP_REQUEST_METHOD: &str = "http/request"; /// JSON-RPC notification method for streamed executor HTTP response bodies. pub const HTTP_REQUEST_BODY_DELTA_METHOD: &str = "http/request/bodyDelta"; +pub const FILE_TRANSFER_PREPARE_UPLOAD_METHOD: &str = "fileTransfer/prepareUpload"; +pub const FILE_TRANSFER_STATUS_METHOD: &str = "fileTransfer/status"; +pub const FILE_TRANSFER_CANCEL_METHOD: &str = "fileTransfer/cancel"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index 82cf2b200a95..8bb6f365b0b7 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -27,6 +27,7 @@ use crate::connection::JsonRpcConnectionEvent; use crate::connection::JsonRpcTransport; pub(crate) const SESSION_ALREADY_ATTACHED_ERROR_CODE: i64 = -32010; +pub const FILE_TRANSFER_SESSION_LOST_ERROR_CODE: i64 = -32011; #[derive(Debug)] pub(crate) enum RpcCallError { @@ -440,6 +441,14 @@ pub(crate) fn session_already_attached(message: String) -> JSONRPCErrorError { } } +pub(crate) fn file_transfer_session_lost(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: FILE_TRANSFER_SESSION_LOST_ERROR_CODE, + data: None, + message, + } +} + pub(crate) fn method_not_found(message: String) -> JSONRPCErrorError { JSONRPCErrorError { code: -32601, diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index d443b0ed1ff8..3b70f60223d3 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,4 +1,5 @@ mod file_system_handler; +mod file_transfer_handler; mod handler; mod process_handler; mod processor; diff --git a/codex-rs/exec-server/src/server/file_transfer_handler.rs b/codex-rs/exec-server/src/server/file_transfer_handler.rs new file mode 100644 index 000000000000..a97dbcec12c6 --- /dev/null +++ b/codex-rs/exec-server/src/server/file_transfer_handler.rs @@ -0,0 +1,371 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +use base64::Engine; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use codex_app_server_protocol::JSONRPCErrorError; +use sha2::Digest; +use sha2::Sha256; +use tokio::sync::Mutex; +use tokio::sync::Notify; +use tokio::time::Instant; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; +use uuid::Uuid; +use zeroize::Zeroizing; + +use crate::ExecServerRuntimePaths; +use crate::local_file_system::LocalFileSystem; +use crate::protocol::FileTransferCancelParams; +use crate::protocol::FileTransferCancelResponse; +use crate::protocol::FileTransferDigest; +use crate::protocol::FileTransferDigestAlgorithm; +use crate::protocol::FileTransferOperationState; +use crate::protocol::FileTransferPrepareUploadParams; +use crate::protocol::FileTransferPrepareUploadResponse; +use crate::protocol::FileTransferStatusParams; +use crate::protocol::FileTransferStatusResponse; +use crate::protocol::MAX_PREPARED_FILE_UPLOAD_BYTES; +use crate::rpc::file_transfer_session_lost; +use crate::rpc::internal_error; +use crate::rpc::invalid_params; +use crate::rpc::invalid_request; +use crate::rpc::not_found; + +pub(crate) const FILE_TRANSFER_ENABLED_ENV_VAR: &str = + "CODEX_EXEC_SERVER_PREPARED_FILE_UPLOAD_ENABLED"; +const MAX_PREPARED_BYTES_PER_SESSION: u64 = 32 * 1024 * 1024; +const MAX_OPERATIONS_PER_SESSION: usize = 32; +#[cfg(test)] +const PREPARED_UPLOAD_TTL: Duration = Duration::from_millis(100); +#[cfg(not(test))] +const PREPARED_UPLOAD_TTL: Duration = Duration::from_secs(10 * 60); +const TERMINAL_RESULT_TTL: Duration = Duration::from_secs(10 * 60); + +#[derive(Clone, Copy)] +pub(crate) enum PreparedFileUploadAvailability { + Disabled, + EnabledForDevelopment, +} + +#[derive(Clone)] +pub(crate) struct FileTransferHandler { + inner: Arc, +} + +struct Inner { + session_generation_id: String, + availability: PreparedFileUploadAvailability, + file_system: LocalFileSystem, + operations: Mutex>, + tasks: TaskTracker, + shutdown: CancellationToken, + expiry_changed: Notify, +} + +struct UploadOperation { + deadline: Instant, + bytes: Option>>, + state: FileTransferOperationState, + error: Option, + terminal_at: Option, +} + +impl FileTransferHandler { + pub(crate) fn new( + runtime_paths: ExecServerRuntimePaths, + availability: PreparedFileUploadAvailability, + ) -> Self { + let handler = Self { + inner: Arc::new(Inner { + // This tag distinguishes logical session generations without + // exposing the resume session ID, which is a bearer secret. + session_generation_id: Uuid::new_v4().to_string(), + availability, + file_system: LocalFileSystem::with_runtime_paths(runtime_paths), + operations: Mutex::new(HashMap::new()), + tasks: TaskTracker::new(), + shutdown: CancellationToken::new(), + expiry_changed: Notify::new(), + }), + }; + handler.start_expiry_sweeper(); + handler + } + + pub(crate) async fn prepare_upload( + &self, + params: FileTransferPrepareUploadParams, + ) -> Result { + self.require_enabled()?; + if params.max_bytes == 0 || params.max_bytes > MAX_PREPARED_FILE_UPLOAD_BYTES { + return Err(invalid_params(format!( + "file transfer maxBytes must be between 1 and {MAX_PREPARED_FILE_UPLOAD_BYTES}" + ))); + } + self.prune_operations().await; + { + let operations = self.inner.operations.lock().await; + ensure_prepare_quota(&operations, /*additional_bytes*/ 0)?; + } + + let bytes = self + .inner + .file_system + .read_file_with_limit(¶ms.path, Some(¶ms.sandbox), params.max_bytes) + .await + .map_err(map_prepare_error)?; + let size = bytes.len() as u64; + let name = params + .path + .basename() + .filter(|name| !name.is_empty()) + .unwrap_or_else(|| "upload".to_string()); + let digest = FileTransferDigest { + algorithm: FileTransferDigestAlgorithm::Sha256, + value: URL_SAFE_NO_PAD.encode(Sha256::digest(&bytes)), + }; + let transfer_id = format!("{}:{}", self.inner.session_generation_id, Uuid::new_v4()); + let deadline = Instant::now() + PREPARED_UPLOAD_TTL; + let expires_at_unix_seconds = unix_seconds(SystemTime::now() + PREPARED_UPLOAD_TTL); + let operation = UploadOperation { + deadline, + bytes: Some(Zeroizing::new(bytes)), + state: FileTransferOperationState::Prepared, + error: None, + terminal_at: None, + }; + + let mut operations = self.inner.operations.lock().await; + ensure_prepare_quota(&operations, size)?; + operations.insert(transfer_id.clone(), operation); + drop(operations); + self.inner.expiry_changed.notify_one(); + + Ok(FileTransferPrepareUploadResponse { + transfer_id, + name, + size, + digest, + expires_at_unix_seconds, + }) + } + + pub(crate) async fn status( + &self, + params: FileTransferStatusParams, + ) -> Result { + self.require_enabled()?; + self.validate_transfer_id(¶ms.transfer_id)?; + let mut operations = self.inner.operations.lock().await; + let operation = operations + .get_mut(¶ms.transfer_id) + .ok_or_else(|| not_found("unknown file transfer operation".to_string()))?; + expire_operation(operation, Instant::now()); + Ok(status_response(¶ms.transfer_id, operation)) + } + + pub(crate) async fn cancel( + &self, + params: FileTransferCancelParams, + ) -> Result { + self.require_enabled()?; + self.validate_transfer_id(¶ms.transfer_id)?; + let mut operations = self.inner.operations.lock().await; + let operation = operations + .get_mut(¶ms.transfer_id) + .ok_or_else(|| not_found("unknown file transfer operation".to_string()))?; + expire_operation(operation, Instant::now()); + if operation.state == FileTransferOperationState::Prepared { + operation.bytes = None; + set_terminal( + operation, + FileTransferOperationState::Canceled, + /*error*/ None, + ); + self.inner.expiry_changed.notify_one(); + } + Ok(FileTransferCancelResponse { + state: operation.state, + }) + } + + pub(crate) async fn shutdown(&self) { + self.inner.shutdown.cancel(); + self.inner.tasks.close(); + { + let mut operations = self.inner.operations.lock().await; + for operation in operations.values_mut() { + operation.bytes = None; + } + } + self.inner.tasks.wait().await; + } + + fn require_enabled(&self) -> Result<(), JSONRPCErrorError> { + if matches!( + self.inner.availability, + PreparedFileUploadAvailability::EnabledForDevelopment + ) { + Ok(()) + } else { + Err(invalid_request( + "prepared file upload is disabled on this executor".to_string(), + )) + } + } + + fn validate_transfer_id(&self, transfer_id: &str) -> Result<(), JSONRPCErrorError> { + let Some((session_generation_id, opaque_id)) = transfer_id.split_once(':') else { + return Err(invalid_params("invalid file transfer id".to_string())); + }; + if session_generation_id != self.inner.session_generation_id { + return Err(file_transfer_session_lost( + "file transfer belongs to an expired executor session".to_string(), + )); + } + if Uuid::parse_str(opaque_id).is_err() { + return Err(invalid_params("invalid file transfer id".to_string())); + } + Ok(()) + } + + fn start_expiry_sweeper(&self) { + let inner = Arc::clone(&self.inner); + let tasks = self.inner.tasks.clone(); + let _task = tasks.spawn(async move { + loop { + let notified = inner.expiry_changed.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + let next_deadline = inner + .operations + .lock() + .await + .values() + .filter(|operation| operation.state == FileTransferOperationState::Prepared) + .map(|operation| operation.deadline) + .min(); + match next_deadline { + Some(deadline) => tokio::select! { + _ = tokio::time::sleep_until(deadline) => { + let now = Instant::now(); + let mut operations = inner.operations.lock().await; + for operation in operations.values_mut() { + expire_operation(operation, now); + } + } + _ = &mut notified => {} + _ = inner.shutdown.cancelled() => break, + }, + None => tokio::select! { + _ = &mut notified => {} + _ = inner.shutdown.cancelled() => break, + }, + } + } + }); + } + + async fn prune_operations(&self) { + let now = Instant::now(); + let mut operations = self.inner.operations.lock().await; + for operation in operations.values_mut() { + expire_operation(operation, now); + } + operations.retain(|_, operation| { + operation + .terminal_at + .is_none_or(|terminal_at| now.duration_since(terminal_at) < TERMINAL_RESULT_TTL) + }); + while operations.len() >= MAX_OPERATIONS_PER_SESSION { + let oldest_terminal = operations + .iter() + .filter_map(|(id, operation)| operation.terminal_at.map(|at| (id.clone(), at))) + .min_by_key(|(_, at)| *at) + .map(|(id, _)| id); + let Some(oldest_terminal) = oldest_terminal else { + break; + }; + operations.remove(&oldest_terminal); + } + } +} + +fn ensure_prepare_quota( + operations: &HashMap, + additional_bytes: u64, +) -> Result<(), JSONRPCErrorError> { + if operations.len() >= MAX_OPERATIONS_PER_SESSION { + return Err(invalid_request( + "file transfer operation quota exceeded".to_string(), + )); + } + if prepared_bytes(operations).saturating_add(additional_bytes) > MAX_PREPARED_BYTES_PER_SESSION + { + return Err(invalid_request( + "prepared upload session quota exceeded".to_string(), + )); + } + Ok(()) +} + +fn prepared_bytes(operations: &HashMap) -> u64 { + operations + .values() + .filter_map(|operation| operation.bytes.as_ref()) + .map(|bytes| bytes.len() as u64) + .sum() +} + +fn status_response(transfer_id: &str, operation: &UploadOperation) -> FileTransferStatusResponse { + FileTransferStatusResponse { + transfer_id: transfer_id.to_string(), + state: operation.state, + error: operation.error.clone(), + } +} + +fn expire_operation(operation: &mut UploadOperation, now: Instant) { + if operation.state == FileTransferOperationState::Prepared && now >= operation.deadline { + operation.bytes = None; + set_terminal( + operation, + FileTransferOperationState::Expired, + /*error*/ None, + ); + } +} + +fn set_terminal( + operation: &mut UploadOperation, + state: FileTransferOperationState, + error: Option, +) { + operation.state = state; + operation.error = error; + operation.terminal_at = Some(Instant::now()); +} + +fn unix_seconds(time: SystemTime) -> i64 { + time.duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs() as i64) + .unwrap_or(0) +} + +fn map_prepare_error(error: std::io::Error) -> JSONRPCErrorError { + match error.kind() { + std::io::ErrorKind::NotFound => not_found("upload source was not found".to_string()), + std::io::ErrorKind::InvalidInput | std::io::ErrorKind::PermissionDenied => { + invalid_params("upload source is unavailable or exceeds the byte limit".to_string()) + } + _ => internal_error("failed to prepare upload source".to_string()), + } +} + +#[cfg(test)] +#[path = "file_transfer_handler_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/server/file_transfer_handler_tests.rs b/codex-rs/exec-server/src/server/file_transfer_handler_tests.rs new file mode 100644 index 000000000000..2d6b1ca47f11 --- /dev/null +++ b/codex-rs/exec-server/src/server/file_transfer_handler_tests.rs @@ -0,0 +1,200 @@ +use std::time::Duration; + +use codex_file_system::FileSystemSandboxContext; +use codex_protocol::models::PermissionProfile; +use codex_utils_path_uri::PathUri; +use pretty_assertions::assert_eq; +use tokio::time::timeout; + +use super::*; +use crate::rpc::FILE_TRANSFER_SESSION_LOST_ERROR_CODE; + +fn test_runtime_paths() -> ExecServerRuntimePaths { + ExecServerRuntimePaths::new( + std::env::current_exe().expect("current exe"), + /*codex_linux_sandbox_exe*/ None, + ) + .expect("runtime paths") +} + +fn test_handler() -> FileTransferHandler { + FileTransferHandler::new( + test_runtime_paths(), + PreparedFileUploadAvailability::EnabledForDevelopment, + ) +} + +fn prepare_params(path: &std::path::Path, max_bytes: u64) -> FileTransferPrepareUploadParams { + FileTransferPrepareUploadParams { + path: PathUri::from_path(path).expect("path URI"), + sandbox: full_access_context(), + max_bytes, + } +} + +fn full_access_context() -> FileSystemSandboxContext { + FileSystemSandboxContext::from_permission_profile(PermissionProfile::Disabled) +} + +#[tokio::test] +async fn prepare_captures_stable_bytes_and_metadata() { + let source_dir = tempfile::tempdir().expect("source tempdir"); + let source = source_dir.path().join("report.txt"); + tokio::fs::write(&source, b"prepared bytes") + .await + .expect("write source"); + let handler = test_handler(); + let prepared = handler + .prepare_upload(prepare_params(&source, /*max_bytes*/ 1024)) + .await + .expect("prepare upload"); + assert_eq!(prepared.name, "report.txt"); + assert_eq!(prepared.size, 14); + assert_eq!( + prepared.digest.algorithm, + FileTransferDigestAlgorithm::Sha256 + ); + + tokio::fs::write(&source, b"different bytes") + .await + .expect("mutate source"); + { + let operations = handler.inner.operations.lock().await; + let snapshot = operations + .get(&prepared.transfer_id) + .and_then(|operation| operation.bytes.as_ref()) + .expect("prepared snapshot should remain present"); + assert_eq!(snapshot.as_slice(), b"prepared bytes"); + } + handler.shutdown().await; +} + +#[tokio::test] +async fn prepare_enforces_byte_limit_and_cancel_drops_snapshot() { + let source_dir = tempfile::tempdir().expect("source tempdir"); + let source = source_dir.path().join("report.txt"); + tokio::fs::write(&source, b"nine-byte") + .await + .expect("write source"); + let handler = test_handler(); + + let oversized = handler + .prepare_upload(prepare_params(&source, /*max_bytes*/ 8)) + .await + .expect_err("source exceeds requested limit"); + assert_eq!(oversized.code, -32602); + + let prepared = handler + .prepare_upload(prepare_params(&source, /*max_bytes*/ 1024)) + .await + .expect("prepare upload"); + let canceled = handler + .cancel(FileTransferCancelParams { + transfer_id: prepared.transfer_id.clone(), + }) + .await + .expect("cancel prepared upload"); + assert_eq!(canceled.state, FileTransferOperationState::Canceled); + let status = handler + .status(FileTransferStatusParams { + transfer_id: prepared.transfer_id, + }) + .await + .expect("canceled status"); + assert_eq!(status.state, FileTransferOperationState::Canceled); + { + let operations = handler.inner.operations.lock().await; + assert_eq!(prepared_bytes(&operations), 0); + } + handler.shutdown().await; +} + +#[tokio::test] +async fn prepared_snapshot_expires_without_a_follow_up_rpc() { + let source_dir = tempfile::tempdir().expect("source tempdir"); + let source = source_dir.path().join("report.txt"); + tokio::fs::write(&source, b"sensitive bytes") + .await + .expect("write source"); + let handler = test_handler(); + let prepared = handler + .prepare_upload(prepare_params(&source, /*max_bytes*/ 1024)) + .await + .expect("prepare upload"); + + timeout(PREPARED_UPLOAD_TTL + Duration::from_secs(1), async { + loop { + let expired = handler + .inner + .operations + .lock() + .await + .get(&prepared.transfer_id) + .is_some_and(|operation| operation.state == FileTransferOperationState::Expired); + if expired { + break; + } + tokio::task::yield_now().await; + } + }) + .await + .expect("sweeper should expire the prepared snapshot"); + { + let operations = handler.inner.operations.lock().await; + let operation = operations + .get(&prepared.transfer_id) + .expect("terminal status should be retained"); + assert!(operation.bytes.is_none()); + } + handler.shutdown().await; +} + +#[tokio::test] +async fn terminal_records_do_not_poison_the_session_quota() { + let source_dir = tempfile::tempdir().expect("source tempdir"); + let source = source_dir.path().join("report.txt"); + tokio::fs::write(&source, b"bytes") + .await + .expect("write source"); + let handler = test_handler(); + for _ in 0..(MAX_OPERATIONS_PER_SESSION + 8) { + let prepared = handler + .prepare_upload(prepare_params(&source, /*max_bytes*/ 1024)) + .await + .expect("terminal records should be pruned under pressure"); + handler + .cancel(FileTransferCancelParams { + transfer_id: prepared.transfer_id, + }) + .await + .expect("cancel upload"); + } + assert_eq!(handler.inner.tasks.len(), 1); + handler.shutdown().await; +} + +#[tokio::test] +async fn disabled_handler_rejects_before_reading_and_detects_old_session_ids() { + let disabled = FileTransferHandler::new( + test_runtime_paths(), + PreparedFileUploadAvailability::Disabled, + ); + let temp_dir = tempfile::tempdir().expect("create temp directory"); + let missing = temp_dir.path().join("missing"); + let error = disabled + .prepare_upload(prepare_params(&missing, /*max_bytes*/ 1024)) + .await + .expect_err("disabled handler must reject before filesystem access"); + assert_eq!(error.code, -32600); + + let handler = test_handler(); + let error = handler + .status(FileTransferStatusParams { + transfer_id: format!("old-generation:{}", Uuid::new_v4()), + }) + .await + .expect_err("old session ID should be distinguishable from an unknown operation"); + assert_eq!(error.code, FILE_TRANSFER_SESSION_LOST_ERROR_CODE); + disabled.shutdown().await; + handler.shutdown().await; +} diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index 07575a1c42eb..edaf094555c3 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -17,6 +17,12 @@ use crate::client::http_client::ReqwestHttpRequestRunner; use crate::protocol::EnvironmentInfo; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; +use crate::protocol::FileTransferCancelParams; +use crate::protocol::FileTransferCancelResponse; +use crate::protocol::FileTransferPrepareUploadParams; +use crate::protocol::FileTransferPrepareUploadResponse; +use crate::protocol::FileTransferStatusParams; +use crate::protocol::FileTransferStatusResponse; use crate::protocol::FsCanonicalizeParams; use crate::protocol::FsCanonicalizeResponse; use crate::protocol::FsCloseParams; @@ -164,6 +170,30 @@ impl ExecServerHandler { Ok(EnvironmentInfo::local()) } + pub(crate) async fn file_transfer_prepare_upload( + &self, + params: FileTransferPrepareUploadParams, + ) -> Result { + let session = self.require_initialized_for("file transfer")?; + session.file_transfer().prepare_upload(params).await + } + + pub(crate) async fn file_transfer_status( + &self, + params: FileTransferStatusParams, + ) -> Result { + let session = self.require_initialized_for("file transfer")?; + session.file_transfer().status(params).await + } + + pub(crate) async fn file_transfer_cancel( + &self, + params: FileTransferCancelParams, + ) -> Result { + let session = self.require_initialized_for("file transfer")?; + session.file_transfer().cancel(params).await + } + pub(crate) async fn exec_read( &self, params: ReadParams, diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 8f48aeaf99b7..ddfc5a8f22f5 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -7,6 +7,9 @@ use crate::protocol::EXEC_SIGNAL_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::ExecParams; +use crate::protocol::FILE_TRANSFER_CANCEL_METHOD; +use crate::protocol::FILE_TRANSFER_PREPARE_UPLOAD_METHOD; +use crate::protocol::FILE_TRANSFER_STATUS_METHOD; use crate::protocol::FS_CANONICALIZE_METHOD; use crate::protocol::FS_CLOSE_METHOD; use crate::protocol::FS_COPY_METHOD; @@ -18,6 +21,9 @@ use crate::protocol::FS_READ_DIRECTORY_METHOD; use crate::protocol::FS_READ_FILE_METHOD; use crate::protocol::FS_REMOVE_METHOD; use crate::protocol::FS_WRITE_FILE_METHOD; +use crate::protocol::FileTransferCancelParams; +use crate::protocol::FileTransferPrepareUploadParams; +use crate::protocol::FileTransferStatusParams; use crate::protocol::FsCanonicalizeParams; use crate::protocol::FsCloseParams; use crate::protocol::FsCopyParams; @@ -69,6 +75,24 @@ pub(crate) fn build_router() -> RpcRouter { ENVIRONMENT_INFO_METHOD, |handler: Arc, _params: ()| async move { handler.environment_info() }, ); + router.request( + FILE_TRANSFER_PREPARE_UPLOAD_METHOD, + |handler: Arc, params: FileTransferPrepareUploadParams| async move { + handler.file_transfer_prepare_upload(params).await + }, + ); + router.request( + FILE_TRANSFER_STATUS_METHOD, + |handler: Arc, params: FileTransferStatusParams| async move { + handler.file_transfer_status(params).await + }, + ); + router.request( + FILE_TRANSFER_CANCEL_METHOD, + |handler: Arc, params: FileTransferCancelParams| async move { + handler.file_transfer_cancel(params).await + }, + ); router.request( EXEC_READ_METHOD, |handler: Arc, params: ReadParams| async move { diff --git a/codex-rs/exec-server/src/server/session_registry.rs b/codex-rs/exec-server/src/server/session_registry.rs index 73b14ff7cd78..742a31314281 100644 --- a/codex-rs/exec-server/src/server/session_registry.rs +++ b/codex-rs/exec-server/src/server/session_registry.rs @@ -11,6 +11,9 @@ use crate::ExecServerRuntimePaths; use crate::rpc::RpcNotificationSender; use crate::rpc::invalid_request; use crate::rpc::session_already_attached; +use crate::server::file_transfer_handler::FILE_TRANSFER_ENABLED_ENV_VAR; +use crate::server::file_transfer_handler::FileTransferHandler; +use crate::server::file_transfer_handler::PreparedFileUploadAvailability; use crate::server::process_handler::ProcessHandler; #[cfg(test)] @@ -20,11 +23,13 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(30); pub(crate) struct SessionRegistry { sessions: Mutex>>, + file_transfer_availability: PreparedFileUploadAvailability, } struct SessionEntry { session_id: String, process: ProcessHandler, + file_transfer: FileTransferHandler, attachment: StdMutex, } @@ -54,6 +59,7 @@ impl SessionRegistry { pub(crate) fn new() -> Arc { Arc::new(Self { sessions: Mutex::new(HashMap::new()), + file_transfer_availability: file_transfer_availability_from_environment(), }) } @@ -97,7 +103,8 @@ impl SessionRegistry { let session_id = Uuid::new_v4().to_string(); let entry = Arc::new(SessionEntry::new( session_id.clone(), - ProcessHandler::new(notifications, runtime_paths), + ProcessHandler::new(notifications, runtime_paths.clone()), + FileTransferHandler::new(runtime_paths, self.file_transfer_availability), connection_id, )); sessions.insert(session_id, Arc::clone(&entry)); @@ -107,7 +114,7 @@ impl SessionRegistry { let entry = match outcome? { AttachOutcome::Attached(entry) => entry, AttachOutcome::Expired { session_id, entry } => { - entry.process.shutdown().await; + entry.shutdown().await; return Err(invalid_request(format!("unknown session id {session_id}"))); } }; @@ -134,7 +141,7 @@ impl SessionRegistry { }; if let Some(entry) = removed { - entry.process.shutdown().await; + entry.shutdown().await; } } } @@ -143,15 +150,36 @@ impl Default for SessionRegistry { fn default() -> Self { Self { sessions: Mutex::new(HashMap::new()), + file_transfer_availability: PreparedFileUploadAvailability::Disabled, } } } +fn file_transfer_availability_from_environment() -> PreparedFileUploadAvailability { + if !cfg!(debug_assertions) { + return PreparedFileUploadAvailability::Disabled; + } + if std::env::var(FILE_TRANSFER_ENABLED_ENV_VAR) + .ok() + .is_some_and(|value| matches!(value.as_str(), "1" | "true" | "TRUE")) + { + PreparedFileUploadAvailability::EnabledForDevelopment + } else { + PreparedFileUploadAvailability::Disabled + } +} + impl SessionEntry { - fn new(session_id: String, process: ProcessHandler, connection_id: ConnectionId) -> Self { + fn new( + session_id: String, + process: ProcessHandler, + file_transfer: FileTransferHandler, + connection_id: ConnectionId, + ) -> Self { Self { session_id, process, + file_transfer, attachment: StdMutex::new(AttachmentState { current_connection_id: Some(connection_id), detached_connection_id: None, @@ -160,6 +188,11 @@ impl SessionEntry { } } + async fn shutdown(&self) { + self.file_transfer.shutdown().await; + self.process.shutdown().await; + } + fn attach(&self, connection_id: ConnectionId) { let mut attachment = self .attachment @@ -244,6 +277,10 @@ impl SessionHandle { &self.entry.process } + pub(crate) fn file_transfer(&self) -> &FileTransferHandler { + &self.entry.file_transfer + } + pub(crate) async fn detach(&self) { if !self.entry.detach(self.connection_id) { return;