diff --git a/Cargo.lock b/Cargo.lock index c88d744e0d..8d8d039d8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,7 +668,7 @@ dependencies = [ "pin-project-lite", "serde_core", "sync_wrapper 1.0.2", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", ] @@ -2503,6 +2503,24 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "forge_jsonrpc" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "forge_api", + "forge_domain", + "forge_stream", + "futures", + "jsonrpsee", + "serde", + "serde_json", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "forge_main" version = "0.1.0" @@ -2529,6 +2547,7 @@ dependencies = [ "forge_domain", "forge_embed", "forge_fs", + "forge_jsonrpc", "forge_markdown_stream", "forge_select", "forge_spinner", @@ -5111,6 +5130,95 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonrpsee" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e281ae70cc3b98dac15fced3366a880949e65fc66e345ce857a5682d152f3e62" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-proc-macros", + "jsonrpsee-server", + "jsonrpsee-types", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348ee569eaed52926b5e740aae20863762b16596476e943c9e415a6479021622" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "jsonrpsee-types", + "parking_lot", + "rand 0.8.5", + "rustc-hash", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7398cddf5013cca4702862a2692b66c48a3bd6cf6ec681a47453c93d63cf8de5" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "jsonrpsee-server" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21429bcdda37dcf2d43b68621b994adede0e28061f816b038b0f18c70c143d51" +dependencies = [ + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.9.0", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.4.13", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f05e0028e55b15dbd2107163b3c744cd3bb4474f193f95d9708acbf5677e44" +dependencies = [ + "http 1.4.0", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "jsonwebtoken" version = "10.3.0" @@ -6249,6 +6357,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "proc-macro-crate" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" +dependencies = [ + "toml_edit 0.25.11+spec-1.1.0", +] + [[package]] name = "proc-macro-error-attr2" version = "2.0.0" @@ -6809,7 +6926,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tokio-util", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -6851,7 +6968,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-rustls 0.26.4", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -7017,6 +7134,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "rsqlite-vfs" version = "0.1.0" @@ -7724,6 +7847,22 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "soketto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "http 1.4.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", +] + [[package]] name = "spin" version = "0.9.8" @@ -8387,6 +8526,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -8397,6 +8537,7 @@ checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -8541,7 +8682,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tokio-stream", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", @@ -8587,6 +8728,21 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.3" @@ -8624,7 +8780,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", ] @@ -8647,6 +8803,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 51e3801695..d9f8a96a57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ html2md = "0.2.15" http = "1.2.0" ignore = "0.4.23" is_ci = "1.2.0" +jsonrpsee = { version = "0.24", features = ["server", "macros"] } indexmap = "2.13.0" infer = "0.19.0" insta = { version = "1.47.2", features = ["json", "yaml"] } @@ -101,6 +102,7 @@ tokio = { version = "1.51.0", features = [ "process", "signal", "io-util", + "io-std", ] } tokio-stream = "0.1.18" tokio-util = "0.7" @@ -159,6 +161,7 @@ forge_tool_macros = { path = "crates/forge_tool_macros" } forge_tracker = { path = "crates/forge_tracker" } forge_walker = { path = "crates/forge_walker" } forge_json_repair = { path = "crates/forge_json_repair" } +forge_jsonrpc = { path = "crates/forge_jsonrpc" } forge_select = { path = "crates/forge_select" } forge_test_kit = { path = "crates/forge_test_kit" } diff --git a/crates/forge_api/src/api.rs b/crates/forge_api/src/api.rs index aafb112d49..cf2c44b491 100644 --- a/crates/forge_api/src/api.rs +++ b/crates/forge_api/src/api.rs @@ -61,6 +61,9 @@ pub trait API: Sync + Send { /// Adds a new conversation to the conversation store async fn upsert_conversation(&self, conversation: Conversation) -> Result<()>; + /// Creates a new conversation with an optional title and returns it + async fn create_conversation(&self, title: Option) -> Result; + /// Returns the conversation with the given ID async fn conversation(&self, conversation_id: &ConversationId) -> Result>; diff --git a/crates/forge_api/src/forge_api.rs b/crates/forge_api/src/forge_api.rs index b56d485bfd..1bf2daeca5 100644 --- a/crates/forge_api/src/forge_api.rs +++ b/crates/forge_api/src/forge_api.rs @@ -151,6 +151,13 @@ impl< self.services.upsert_conversation(conversation).await } + async fn create_conversation(&self, title: Option) -> anyhow::Result { + let mut conversation = Conversation::generate(); + conversation.title = title; + self.services.upsert_conversation(conversation.clone()).await?; + Ok(conversation) + } + async fn compact_conversation( &self, conversation_id: &ConversationId, diff --git a/crates/forge_jsonrpc/Cargo.toml b/crates/forge_jsonrpc/Cargo.toml new file mode 100644 index 0000000000..aa55ecf1d8 --- /dev/null +++ b/crates/forge_jsonrpc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "forge_jsonrpc" +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true +jsonrpsee = { version = "0.24", features = ["server", "macros"] } +futures = { workspace = true, features = ["async-await"] } +chrono.workspace = true +uuid = { workspace = true, features = ["serde"] } + +# Internal crates +forge_api.workspace = true +forge_domain.workspace = true +forge_stream.workspace = true diff --git a/crates/forge_jsonrpc/src/error.rs b/crates/forge_jsonrpc/src/error.rs new file mode 100644 index 0000000000..a89dadd105 --- /dev/null +++ b/crates/forge_jsonrpc/src/error.rs @@ -0,0 +1,58 @@ +use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; + +/// JSON-RPC error codes +pub struct ErrorCode; + +impl ErrorCode { + pub const PARSE_ERROR: i32 = -32700; + pub const INVALID_REQUEST: i32 = -32600; + pub const METHOD_NOT_FOUND: i32 = -32601; + pub const INVALID_PARAMS: i32 = -32602; + pub const INTERNAL_ERROR: i32 = -32603; + pub const NOT_FOUND: i32 = -32001; + pub const UNAUTHORIZED: i32 = -32002; + pub const VALIDATION_FAILED: i32 = -32003; +} + +/// Convert anyhow errors to JSON-RPC errors +pub fn map_error(err: anyhow::Error) -> ErrorObjectOwned { + // Try to downcast to domain errors + if let Some(domain_err) = err.downcast_ref::() { + return map_domain_error(domain_err); + } + + // Default: internal error + ErrorObject::owned( + ErrorCode::INTERNAL_ERROR, + format!("Internal error: {err}"), + None::<()>, + ) +} + +fn map_domain_error(err: &forge_domain::Error) -> ErrorObjectOwned { + match err { + forge_domain::Error::ConversationNotFound(_) + | forge_domain::Error::AgentUndefined(_) + | forge_domain::Error::WorkspaceNotFound + | forge_domain::Error::HeadAgentUndefined => { + ErrorObject::owned(ErrorCode::NOT_FOUND, err.to_string(), None::<()>) + } + forge_domain::Error::ProviderNotAvailable { .. } + | forge_domain::Error::EnvironmentVariableNotFound { .. } + | forge_domain::Error::AuthTokenNotFound => { + ErrorObject::owned(ErrorCode::UNAUTHORIZED, err.to_string(), None::<()>) + } + _ => { + ErrorObject::owned(ErrorCode::INTERNAL_ERROR, err.to_string(), None::<()>) + } + } +} + +/// Create a not-found error object +pub fn not_found(resource: &str, id: &str) -> ErrorObjectOwned { + ErrorObject::owned( + ErrorCode::NOT_FOUND, + format!("{resource} not found: {id}"), + Some(serde_json::json!({ "resource": resource, "id": id })), + ) +} diff --git a/crates/forge_jsonrpc/src/lib.rs b/crates/forge_jsonrpc/src/lib.rs new file mode 100644 index 0000000000..dca5c7bbad --- /dev/null +++ b/crates/forge_jsonrpc/src/lib.rs @@ -0,0 +1,8 @@ +mod transport; +pub mod types; + +pub mod error; +pub mod server; + +pub use transport::stdio::StdioTransport; +pub use server::JsonRpcServer; diff --git a/crates/forge_jsonrpc/src/server.rs b/crates/forge_jsonrpc/src/server.rs new file mode 100644 index 0000000000..ed51f203f9 --- /dev/null +++ b/crates/forge_jsonrpc/src/server.rs @@ -0,0 +1,327 @@ +use std::sync::Arc; + +use forge_api::API; +use forge_domain::{ChatRequest, ConversationId, Event, EventValue}; +use futures::StreamExt; +use jsonrpsee::types::ErrorObjectOwned; +use jsonrpsee::{RpcModule, SubscriptionMessage}; +use serde_json::{json, Value}; +use tracing::debug; + +use crate::error::{map_error, not_found, ErrorCode}; +use crate::types::*; + +/// Helper to serialize a value, mapping serialization failures to JSON-RPC +/// errors. +fn to_json_response(value: T) -> Result { + serde_json::to_value(value).map_err(|e| { + ErrorObjectOwned::owned( + ErrorCode::INTERNAL_ERROR, + format!("Failed to serialize response: {e}"), + None::<()>, + ) + }) +} + +/// STDIO-based JSON-RPC server wrapping the Forge API. +/// +/// Registers all JSON-RPC methods and subscriptions on an `RpcModule`. +/// The module is then driven by a [`StdioTransport`] that reads requests +/// from stdin and writes responses/subscription notifications to stdout. +pub struct JsonRpcServer { + api: Arc, + module: RpcModule<()>, +} + +impl JsonRpcServer { + /// Create a new JSON-RPC server, register all methods, and return the + /// ready-to-use instance. + pub fn new(api: Arc) -> Self { + let mut server = Self { api, module: RpcModule::new(()) }; + server.register_methods(); + server + } + + /// Consume the server and return the underlying RpcModule. + pub fn into_module(self) -> RpcModule<()> { + self.module + } + + // ------------------------------------------------------------------ + // Registration + // ------------------------------------------------------------------ + + fn register_methods(&mut self) { + self.register_discovery(); + self.register_conversation(); + self.register_chat(); + } + + // ------------------------------------------------------------------ + // Discovery + // ------------------------------------------------------------------ + + fn register_discovery(&mut self) { + // Build methods list once + let methods_list: Vec = vec![ + MethodInfo { + name: "rpc.methods".into(), + description: "List all available JSON-RPC methods.".into(), + params: None, + result: None, + }, + MethodInfo { + name: "rpc.discover".into(), + description: "List all available JSON-RPC methods (alias for rpc.methods).".into(), + params: None, + result: None, + }, + MethodInfo { + name: "get_methods".into(), + description: "List all available JSON-RPC methods (alias for rpc.methods).".into(), + params: None, + result: None, + }, + MethodInfo { + name: "conversation.create".into(), + description: "Create a new conversation with an optional title.".into(), + params: Some(json!({"title": "string (optional)"})), + result: Some(json!({"id": "uuid", "title": "string", "created_at": "rfc3339"})), + }, + MethodInfo { + name: "chat.stream".into(), + description: "Stream chat responses for a conversation. Subscription.".into(), + params: Some(json!({ + "conversation_id": "uuid (required)", + "message": "string (required)", + "include_reasoning": "boolean (optional)" + })), + result: None, + }, + ]; + + // rpc.methods + let methods_for_rpc = methods_list.clone(); + self.module + .register_async_method("rpc.methods", move |_, _, _| { + let methods = methods_for_rpc.clone(); + async move { to_json_response(methods) } + }) + .expect("Failed to register rpc.methods"); + + // rpc.discover — alias + let methods_for_discover = methods_list.clone(); + self.module + .register_async_method("rpc.discover", move |_, _, _| { + let methods = methods_for_discover.clone(); + async move { to_json_response(methods) } + }) + .expect("Failed to register rpc.discover"); + + // get_methods — alias + let methods_for_get = methods_list.clone(); + self.module + .register_async_method("get_methods", move |_, _, _| { + let methods = methods_for_get.clone(); + async move { to_json_response(methods) } + }) + .expect("Failed to register get_methods"); + } + + // ------------------------------------------------------------------ + // Conversation + // ------------------------------------------------------------------ + + fn register_conversation(&mut self) { + let api = self.api.clone(); + self.module + .register_async_method("conversation.create", move |params, _, _| { + let api = api.clone(); + async move { + let p: CreateConversationParams = params.parse()?; + let conversation = api.create_conversation(p.title).await.map_err(map_error)?; + let response = CreateConversationResponse { + id: conversation.id.into_string(), + title: conversation.title, + created_at: conversation.metadata.created_at.to_rfc3339(), + }; + to_json_response(response) + } + }) + .expect("Failed to register conversation.create"); + } + + // ------------------------------------------------------------------ + // Chat + // ------------------------------------------------------------------ + + fn register_chat(&mut self) { + let api = self.api.clone(); + + self.module + .register_subscription( + "chat.stream", + "chat.notification", + "chat.stream.unsubscribe", + move |params, pending, _, _| { + let api = api.clone(); + async move { + // Parse params. Use pending.reject() instead of `?` + // because jsonrpsee's subscription infrastructure + // hangs when the future returns Err without calling + // accept/reject on the pending sink. + let p: ChatStreamParams = match params.parse() { + Ok(p) => p, + Err(e) => { + pending + .reject(ErrorObjectOwned::owned( + ErrorCode::INVALID_PARAMS, + format!("Invalid params: {e}"), + None::<()>, + )) + .await; + return Ok(()); + } + }; + + // Parse and validate conversation_id early. + let conversation_id = match ConversationId::parse(&p.conversation_id) { + Ok(id) => id, + Err(e) => { + pending + .reject(ErrorObjectOwned::owned( + ErrorCode::INVALID_PARAMS, + format!("Invalid conversation_id: {e}"), + None::<()>, + )) + .await; + return Ok(()); + } + }; + + // Validate conversation exists before starting the + // stream. + match api.conversation(&conversation_id).await { + Ok(Some(_)) => { /* OK */ } + Ok(None) => { + pending + .reject(not_found("Conversation", &p.conversation_id)) + .await; + return Ok(()); + } + Err(e) => { + pending + .reject(map_error(e)) + .await; + return Ok(()); + } + } + + let include_reasoning = p.include_reasoning.unwrap_or(false); + + // Accept the subscription + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(_) => return Ok(()), + }; + + let event = Event::new(EventValue::text(p.message)); + let chat_request = ChatRequest::new(event, conversation_id); + + // Start the chat stream + let stream = match api.chat(chat_request).await { + Ok(stream) => stream, + Err(e) => { + let err_msg = StreamMessage::Error { + message: format!("{e:#}"), + }; + let sub_msg = + SubscriptionMessage::from_json(&err_msg).unwrap_or_else(|_| { + SubscriptionMessage::from_json( + &json!({"status": "error"}), + ) + .expect("fallback should never fail") + }); + let _ = sink.send(sub_msg).await; + return Ok(()); + } + }; + + tokio::pin!(stream); + loop { + let item = stream.next().await; + let msg = match item { + Some(Ok(resp)) => { + match resp { + forge_domain::ChatResponse::TaskMessage { content } => { + let is_tool_input = matches!(&content, forge_domain::ChatResponseContent::ToolInput(_)); + if is_tool_input { + // Skip tool input notifications for JSON-RPC + continue; + } + let text = content.as_str().to_string(); + StreamMessage::Chunk { + data: StreamNotification::Message { content: text }, + } + } + forge_domain::ChatResponse::TaskReasoning { content } => { + if !include_reasoning { + continue; + } + StreamMessage::Chunk { + data: StreamNotification::Reasoning { content }, + } + } + forge_domain::ChatResponse::TaskComplete => { + StreamMessage::Complete { typ: "complete".into() } + } + forge_domain::ChatResponse::ToolCallStart { .. } => { + // Skip tool call start events for + // JSON-RPC + continue; + } + forge_domain::ChatResponse::ToolCallEnd(_) => { + // Skip tool call end events for + // JSON-RPC + continue; + } + forge_domain::ChatResponse::RetryAttempt { cause, .. } => { + StreamMessage::Chunk { + data: StreamNotification::Error { + message: cause.into_string(), + }, + } + } + forge_domain::ChatResponse::Interrupt { reason } => { + StreamMessage::Chunk { + data: StreamNotification::Error { + message: format!("{reason:?}"), + }, + } + } + } + } + Some(Err(e)) => StreamMessage::Error { + message: format!("{e:#}"), + }, + None => break, + }; + + let sub_msg = SubscriptionMessage::from_json(&msg).unwrap_or_else(|_| { + SubscriptionMessage::from_json(&json!({"status": "error"})) + .expect("fallback should never fail") + }); + + if sink.send(sub_msg).await.is_err() { + debug!("Client disconnected from chat stream"); + break; + } + } + + Ok(()) + } + }, + ) + .expect("Failed to register chat.stream"); + } +} diff --git a/crates/forge_jsonrpc/src/transport/mod.rs b/crates/forge_jsonrpc/src/transport/mod.rs new file mode 100644 index 0000000000..b1d7f04bbe --- /dev/null +++ b/crates/forge_jsonrpc/src/transport/mod.rs @@ -0,0 +1 @@ +pub mod stdio; diff --git a/crates/forge_jsonrpc/src/transport/stdio.rs b/crates/forge_jsonrpc/src/transport/stdio.rs new file mode 100644 index 0000000000..d69e22a088 --- /dev/null +++ b/crates/forge_jsonrpc/src/transport/stdio.rs @@ -0,0 +1,277 @@ +use std::sync::Arc; +use std::time::Duration; + +use jsonrpsee::server::RpcModule; +use serde_json::Value; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::sync::Mutex; + +/// STDIO transport for JSON-RPC. +/// +/// Reads JSON-RPC request objects (one per line) from stdin, dispatches them +/// to the registered `RpcModule`, and writes responses plus subscription +/// notifications (one JSON object per line) to stdout. +pub struct StdioTransport { + module: RpcModule<()>, +} + +impl StdioTransport { + pub fn new(module: RpcModule<()>) -> Self { + Self { module } + } + + /// Run the transport loop over real stdin/stdout until stdin is closed. + pub async fn run(self) -> anyhow::Result<()> { + let stdin = tokio::io::stdin(); + let stdout = tokio::io::stdout(); + self.run_with_io(stdin, stdout).await + } + + /// Run the transport loop over the given reader/writer until the reader + /// reaches EOF. + /// + /// This is the core implementation used by [`run`] and available for + /// testing with synthetic I/O streams. + async fn run_with_io(self, reader: R, writer: W) -> anyhow::Result<()> + where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + { + let reader = BufReader::new(reader); + let mut lines = reader.lines(); + let writer = Arc::new(Mutex::new(writer)); + + let mut handles = Vec::new(); + + while let Ok(Some(line)) = lines.next_line().await { + let trimmed: String = line.trim().to_string(); + if trimmed.is_empty() { + continue; + } + + // Parse JSON to detect if this is a notification (no id). + let request: Value = match serde_json::from_str(&trimmed) { + Ok(req) => req, + Err(e) => { + let error_response = serde_json::json!({ + "jsonrpc": "2.0", + "id": null, + "error": { + "code": -32700, + "message": format!("Parse error: {e}") + } + }); + Self::write_line(&writer, &error_response).await?; + continue; + } + }; + + // For notifications (no id), we still process but don't send a + // response. + let is_notification = request.get("id").is_none(); + + let module = self.module.clone(); + let writer_clone = Arc::clone(&writer); + + let handle = tokio::spawn(async move { + match module.raw_json_request(&trimmed, 1024 * 1024).await { + Ok((response_json, mut rx)) => { + // Send the initial response (or subscription + // acceptance). + if !is_notification { + if let Ok(response) = + serde_json::from_str::(&response_json) + { + let _ = Self::write_line(&writer_clone, &response).await; + } + } + + // Forward subscription notifications. + // rx is a tokio::sync::mpsc::Receiver + loop { + match rx.recv().await { + Some(notification) => { + if let Ok(notif_value) = + serde_json::from_str::(¬ification) + { + if Self::write_line(&writer_clone, ¬if_value) + .await + .is_err() + { + break; + } + } + } + None => break, + } + } + } + Err(e) => { + if !is_notification { + let error_response = serde_json::json!({ + "jsonrpc": "2.0", + "id": null, + "error": { + "code": -32603, + "message": format!("Internal error: {e}") + } + }); + let _ = Self::write_line(&writer_clone, &error_response).await; + } + } + } + }); + handles.push(handle); + } + + // Wait for all dispatched requests to complete before returning. + // Without this, the process exits before async work finishes when + // stdin is a pipe (stdin EOF is reached immediately after the last + // line is read). + // + // Use a per-handle timeout to avoid hanging indefinitely on + // long-running subscriptions. + for handle in handles { + match tokio::time::timeout(Duration::from_secs(60), handle).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + tracing::warn!("JSON-RPC task failed: {e}"); + } + Err(_) => { + tracing::warn!("JSON-RPC task timed out after 60s"); + } + } + } + + Ok(()) + } + + /// Write a single JSON line to writer, flushing afterwards. + async fn write_line( + writer: &Arc>, + value: &Value, + ) -> anyhow::Result<()> { + let json = serde_json::to_string(value)?; + let mut guard: tokio::sync::MutexGuard<'_, W> = writer.lock().await; + guard.write_all(json.as_bytes()).await?; + guard.write_all(b"\n").await?; + guard.flush().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use jsonrpsee::RpcModule; + use serde_json::{json, Value}; + use tokio::io::AsyncReadExt; + + use super::*; + + #[tokio::test] + async fn test_transport_returns_response_for_valid_request() { + // Given a module with a simple echo method and a transport wrapping it + let mut module = RpcModule::new(()); + module + .register_async_method("echo", |params, _, _| { + let params: Value = params.parse().unwrap_or(Value::Null); + async move { Ok::<_, jsonrpsee::types::ErrorObjectOwned>(params) } + }) + .expect("register echo"); + + let transport = StdioTransport::new(module); + + // When we pipe a JSON-RPC request through synthetic I/O + let request = r#"{"jsonrpc":"2.0","id":1,"method":"echo","params":"hello"}"#; + + let (reader, mut writer) = tokio::io::duplex(4096); + let (mut stdout_reader, stdout_writer) = tokio::io::duplex(4096); + + // Write the request to the reader side + writer.write_all(request.as_bytes()).await.unwrap(); + writer.write_all(b"\n").await.unwrap(); + // Drop writer so the transport sees EOF after the line + drop(writer); + + // Run the transport (this will read the line, process it, and return) + transport + .run_with_io(reader, stdout_writer) + .await + .expect("transport should complete"); + + // Then the response should be available on stdout + let mut output = String::new(); + stdout_reader + .read_to_string(&mut output) + .await + .unwrap(); + let response: Value = serde_json::from_str(output.trim()).unwrap(); + + let expected = json!({"jsonrpc":"2.0","id":1,"result":"hello"}); + assert_eq!(response, expected); + } + + #[tokio::test] + async fn test_transport_returns_error_for_invalid_json() { + // Given a transport wrapping an empty module + let module = RpcModule::new(()); + let transport = StdioTransport::new(module); + + // When invalid JSON is sent + let (reader, mut writer) = tokio::io::duplex(4096); + let (mut stdout_reader, stdout_writer) = tokio::io::duplex(4096); + + writer.write_all(b"not valid json\n").await.unwrap(); + drop(writer); + + transport + .run_with_io(reader, stdout_writer) + .await + .expect("transport should complete"); + + let mut output = String::new(); + stdout_reader + .read_to_string(&mut output) + .await + .unwrap(); + let response: Value = serde_json::from_str(output.trim()).unwrap(); + + // Should be a parse error response + assert_eq!(response["id"], json!(null)); + assert_eq!(response["error"]["code"], -32700); + } + + #[tokio::test] + async fn test_transport_handles_unknown_method() { + // Given a transport with an empty module (no methods registered) + let module = RpcModule::new(()); + let transport = StdioTransport::new(module); + + // When a request for an unknown method is sent + let request = r#"{"jsonrpc":"2.0","id":42,"method":"unknown","params":[]}"#; + + let (reader, mut writer) = tokio::io::duplex(4096); + let (mut stdout_reader, stdout_writer) = tokio::io::duplex(4096); + + writer.write_all(request.as_bytes()).await.unwrap(); + writer.write_all(b"\n").await.unwrap(); + drop(writer); + + transport + .run_with_io(reader, stdout_writer) + .await + .expect("transport should complete"); + + let mut output = String::new(); + stdout_reader + .read_to_string(&mut output) + .await + .unwrap(); + let response: Value = serde_json::from_str(output.trim()).unwrap(); + + // Should be an internal error response (method not found from + // jsonrpsee) + assert_eq!(response["id"], 42); + assert!(response["error"].is_object()); + } +} diff --git a/crates/forge_jsonrpc/src/types.rs b/crates/forge_jsonrpc/src/types.rs new file mode 100644 index 0000000000..0d58e76847 --- /dev/null +++ b/crates/forge_jsonrpc/src/types.rs @@ -0,0 +1,143 @@ +/// JSON-RPC 2.0 request parameter types and response DTOs + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// Params for `conversation.create` +#[derive(Debug, Deserialize)] +pub struct CreateConversationParams { + pub title: Option, +} + +/// Response for `conversation.create` +#[derive(Debug, Clone, Serialize)] +pub struct CreateConversationResponse { + pub id: String, + pub title: Option, + pub created_at: String, +} + +/// Params for `chat.stream` +#[derive(Debug, Deserialize)] +pub struct ChatStreamParams { + pub conversation_id: String, + pub message: String, + #[serde(default)] + pub include_reasoning: Option, +} + +/// A notification emitted as a subscription message during `chat.stream`. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type")] +pub enum StreamNotification { + #[serde(rename = "message")] + Message { + content: String, + }, + #[serde(rename = "reasoning")] + Reasoning { + content: String, + }, + #[serde(rename = "complete")] + Complete, + #[serde(rename = "error")] + Error { + message: String, + }, +} + +/// A message sent through the subscription channel. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub enum StreamMessage { + Chunk { + data: StreamNotification, + }, + Complete { + #[serde(rename = "type")] + typ: String, + }, + Error { + message: String, + }, +} + +/// Response for `rpc.methods` / `get_methods` +#[derive(Debug, Clone, Serialize)] +pub struct MethodInfo { + pub name: String, + pub description: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_stream_message_complete_serialization() { + let msg = StreamMessage::Complete { typ: "complete".into() }; + let actual = serde_json::to_value(&msg).unwrap(); + let expected = json!({"type": "complete"}); + assert_eq!(actual, expected); + // Must NOT be null + assert!(!actual.is_null()); + } + + #[test] + fn test_stream_message_chunk_message() { + let msg = StreamMessage::Chunk { + data: StreamNotification::Message { + content: "hello".into(), + }, + }; + let actual = serde_json::to_value(&msg).unwrap(); + let expected = json!({"data": {"type": "message", "content": "hello"}}); + assert_eq!(actual, expected); + } + + #[test] + fn test_stream_message_chunk_reasoning() { + let msg = StreamMessage::Chunk { + data: StreamNotification::Reasoning { + content: "thinking...".into(), + }, + }; + let actual = serde_json::to_value(&msg).unwrap(); + let expected = json!({"data": {"type": "reasoning", "content": "thinking..."}}); + assert_eq!(actual, expected); + } + + #[test] + fn test_stream_message_error() { + let msg = StreamMessage::Error { + message: "something went wrong".into(), + }; + let actual = serde_json::to_value(&msg).unwrap(); + let expected = json!({"message": "something went wrong"}); + assert_eq!(actual, expected); + } + + /// Simulate the full JSON-RPC notification envelope that wraps a + /// StreamMessage to verify the completion result is explicit. + #[test] + fn test_completion_notification_has_explicit_type() { + let complete = StreamMessage::Complete { typ: "complete".into() }; + let notification = json!({ + "jsonrpc": "2.0", + "method": "chat.notification", + "params": { + "subscription": "sub-1", + "result": serde_json::to_value(&complete).unwrap() + } + }); + + let params = ¬ification["params"]; + assert!(params["result"].is_object(), "result should be an object, not null"); + assert_eq!(params["result"]["type"], "complete"); + } +} diff --git a/crates/forge_main/Cargo.toml b/crates/forge_main/Cargo.toml index e136bc306d..02be74d21a 100644 --- a/crates/forge_main/Cargo.toml +++ b/crates/forge_main/Cargo.toml @@ -28,6 +28,7 @@ forge_select.workspace = true merge.workspace = true forge_fs.workspace = true +forge_jsonrpc.workspace = true tokio.workspace = true tokio-stream.workspace = true futures.workspace = true diff --git a/crates/forge_main/src/cli.rs b/crates/forge_main/src/cli.rs index df83a23c25..7d1e6efbb7 100644 --- a/crates/forge_main/src/cli.rs +++ b/crates/forge_main/src/cli.rs @@ -152,6 +152,17 @@ pub enum TopLevelCommand { /// Stream forge log output (defaults to the most recent log file). Logs(LogsArgs), + + /// Start a JSON-RPC server over stdio. + /// + /// Reads JSON-RPC 2.0 requests from stdin and writes responses to stdout. + /// Use --directory to set the working directory for the server. + #[command(name = "json-rpc")] + JsonRpc { + /// Working directory for the JSON-RPC server. + #[arg(long, short = 'C', default_value = ".")] + directory: PathBuf, + }, } /// Command group for custom command management. diff --git a/crates/forge_main/src/main.rs b/crates/forge_main/src/main.rs index 2f618acf01..0ac9bc990e 100644 --- a/crates/forge_main/src/main.rs +++ b/crates/forge_main/src/main.rs @@ -7,7 +7,7 @@ use clap::Parser; use forge_api::ForgeAPI; use forge_config::ForgeConfig; use forge_domain::TitleFormat; -use forge_main::{Cli, Sandbox, TitleDisplayExt, UI, tracker}; +use forge_main::{Cli, Sandbox, TitleDisplayExt, TopLevelCommand, UI, tracker}; /// Enables ENABLE_VIRTUAL_TERMINAL_PROCESSING on the stdout console handle. /// @@ -90,8 +90,13 @@ async fn run() -> Result<()> { // Initialize and run the UI let mut cli = Cli::parse(); - // Check if there's piped input - if !std::io::stdin().is_terminal() { + // Check if there's piped input. + // Skip for json-rpc which uses stdin as its transport protocol. + let is_json_rpc = matches!( + cli.subcommands, + Some(TopLevelCommand::JsonRpc { .. }) + ); + if !std::io::stdin().is_terminal() && !is_json_rpc { let mut stdin_content = String::new(); std::io::stdin().read_to_string(&mut stdin_content)?; let trimmed_content = stdin_content.trim(); diff --git a/crates/forge_main/src/ui.rs b/crates/forge_main/src/ui.rs index 7089fcd3c8..937191286c 100644 --- a/crates/forge_main/src/ui.rs +++ b/crates/forge_main/src/ui.rs @@ -742,6 +742,34 @@ impl A + Send + Sync> UI crate::logs::run(args, log_dir).await?; return Ok(()); } + TopLevelCommand::JsonRpc { directory } => { + // Resolve the working directory + let cwd = if directory.as_os_str().is_empty() { + std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) + } else { + match directory.canonicalize() { + Ok(cwd) => cwd, + Err(_) => { + anyhow::bail!("Invalid directory: {}", directory.display()) + } + } + }; + + // Build a ForgeAPI for the given directory with the current + // config + use forge_api::ForgeAPI; + use forge_jsonrpc::JsonRpcServer; + use forge_jsonrpc::StdioTransport; + use std::sync::Arc; + + let config = forge_config::ForgeConfig::read() + .context("Failed to read Forge configuration")?; + let api = Arc::new(ForgeAPI::init(cwd, config)); + let server = JsonRpcServer::new(api); + let transport = StdioTransport::new(server.into_module()); + transport.run().await?; + return Ok(()); + } } Ok(()) }