Skip to content

Conversation

@jokemanfire
Copy link
Collaborator

Support task manager to control the task , it will be very useful for longtime and concurrence env .
#528

Motivation and Context

How Has This Been Tested?

Breaking Changes

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

@jokemanfire jokemanfire marked this pull request as draft November 11, 2025 03:10
@github-actions github-actions bot added T-core Core library changes T-model Model/data structure changes labels Nov 11, 2025
@github-actions github-actions bot added T-dependencies Dependencies related changes T-test Testing related changes T-config Configuration file changes T-handler Handler implementation changes labels Nov 14, 2025
@github-actions github-actions bot added the T-examples Example code changes label Dec 5, 2025
@github-actions github-actions bot added T-macros Macro changes T-transport Transport layer changes labels Dec 12, 2025
@jokemanfire jokemanfire marked this pull request as ready for review December 12, 2025 07:30
Copilot AI review requested due to automatic review settings December 12, 2025 07:30
@jokemanfire
Copy link
Collaborator Author

The basic function have done, I will add some example, have a look ,if free @4t145 @alexhancock

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements task support (SEP-1686) to enable asynchronous execution of long-running operations in the MCP protocol. The implementation adds a task manager for coordinating operations, new model types for task status and results, procedural macros for automatic task handler generation, and protocol extensions for task-related requests (list, get, cancel).

Key Changes

  • Added OperationProcessor to manage async task execution with timeout and cancellation support
  • Introduced task-related models (Task, TaskStatus, TaskResult, CreateTaskResult) and request/response types
  • Created #[task_handler] macro to auto-generate task management methods for server handlers
  • Extended CallToolRequestParam with optional task field to trigger async execution

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 22 comments.

Show a summary per file
File Description
crates/rmcp/src/task_manager.rs Core task processor implementation managing running tasks, timeouts, and result collection
crates/rmcp/src/model/task.rs Task lifecycle models including status enum and result types per SEP-1686
crates/rmcp/src/model.rs Added task-related request/response types (GetTaskInfo, ListTasks, GetTaskResult, CancelTask)
crates/rmcp/src/model/capabilities.rs Added TasksCapability for capability negotiation of task support
crates/rmcp/src/model/meta.rs Extended variant list with task request types
crates/rmcp/src/handler/server.rs Added task request routing and enqueue_task branching logic
crates/rmcp/src/handler/server/tool.rs Extended ToolCallContext with task metadata field
crates/rmcp/src/error.rs Added TaskError variant to error enum
crates/rmcp-macros/src/task_handler.rs Procedural macro generating task handler methods (enqueue, list, get_info, get_result, cancel)
crates/rmcp-macros/src/lib.rs Exported task_handler attribute macro
crates/rmcp/src/lib.rs Exposed task_manager module
crates/rmcp/Cargo.toml Added test configuration for task tests
crates/rmcp/tests/test_task.rs Unit tests for operation processor basics
crates/rmcp/src/transport/streamable_http_client.rs Added clippy allow for large enum variant
examples/servers/src/common/counter.rs Integration test demonstrating task enqueueing with long_task tool
examples//src/.rs Updated all CallToolRequestParam usage to include task: None

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +24 to 38
struct ToolCallOperationResult {
id: String,
result: Result<CallToolResult, McpError>,
}

impl OperationResultTransport for ToolCallOperationResult {
fn operation_id(&self) -> &String {
&self.id
}

fn as_any(&self) -> &dyn Any {
self
}
}

Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ToolCallOperationResult struct (lines 24-37) is defined but never used in this file. The task_handler macro generates code that uses ToolCallTaskResult from the task_manager module instead. This is dead code and should be removed to avoid confusion.

Suggested change
struct ToolCallOperationResult {
id: String,
result: Result<CallToolResult, McpError>,
}
impl OperationResultTransport for ToolCallOperationResult {
fn operation_id(&self) -> &String {
&self.id
}
fn as_any(&self) -> &dyn Any {
self
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +46 to +47
#[error("Task error: {0}")]
TaskError(String),
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TaskError variant only contains a String message, which loses the source error information. This makes debugging difficult when tasks fail due to underlying errors (like I/O errors, network errors, etc.). Consider changing this to store a boxed error like other variants, or adding a separate variant for errors with sources: TaskError { message: String, source: Option<Box<dyn std::error::Error + Send + Sync>> }.

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +76
ClientRequest::CallToolRequest(request) => {
if request.params.task.is_some() {
tracing::info!("Enqueueing task for tool call: {}", request.params.name);
self.enqueue_task(request.params, context.clone())
.await
.map(ServerResult::CreateTaskResult)
} else {
self.call_tool(request.params, context)
.await
.map(ServerResult::CallToolResult)
}
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler accepts task requests without checking if task support was negotiated in capabilities. When a client sends a CallToolRequest with the task field populated, the server will attempt to enqueue it regardless of whether task capabilities were advertised during initialization. This violates the MCP capability negotiation contract. The handler should verify that task support is enabled in capabilities before processing task-augmented requests, or return a method_not_found/not_supported error.

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +232
/// Check for tasks that have exceeded their timeout and handle them appropriately.
pub fn check_timeouts(&mut self) {
let now = std::time::Instant::now();
let mut timed_out_tasks = Vec::new();

for (task_id, task) in &self.running_tasks {
if let Some(timeout_duration) = task.timeout {
if now.duration_since(task.started_at).as_secs() > timeout_duration {
task.task_handle.abort();
timed_out_tasks.push(task_id.clone());
}
}
}

for task_id in timed_out_tasks {
if let Some(task) = self.running_tasks.remove(&task_id) {
let timeout_result = TaskResult {
descriptor: task.descriptor,
result: Err(Error::TaskError("Operation timed out".to_string())),
};
self.completed_results.push(timeout_result);
}
}
}

Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check_timeouts method is defined but never called anywhere in the codebase. Tasks can timeout via tokio::time::timeout in spawn_async_task, but this method provides an additional timeout checking mechanism that remains unused. Either this method should be called periodically (e.g., in a background task or when collecting results), or it should be removed to avoid confusion and dead code.

Suggested change
/// Check for tasks that have exceeded their timeout and handle them appropriately.
pub fn check_timeouts(&mut self) {
let now = std::time::Instant::now();
let mut timed_out_tasks = Vec::new();
for (task_id, task) in &self.running_tasks {
if let Some(timeout_duration) = task.timeout {
if now.duration_since(task.started_at).as_secs() > timeout_duration {
task.task_handle.abort();
timed_out_tasks.push(task_id.clone());
}
}
}
for task_id in timed_out_tasks {
if let Some(task) = self.running_tasks.remove(&task_id) {
let timeout_result = TaskResult {
descriptor: task.descriptor,
result: Err(Error::TaskError("Operation timed out".to_string())),
};
self.completed_results.push(timeout_result);
}
}
}

Copilot uses AI. Check for mistakes.
/// Currently running tasks keyed by id
running_tasks: HashMap<String, RunningTask>,
/// Completed results waiting to be collected
completed_results: Vec<TaskResult>,
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The completed_results Vec grows unbounded as tasks complete. Without any cleanup mechanism or TTL enforcement on completed results, this will cause a memory leak in long-running servers that process many tasks. Consider implementing automatic cleanup of old completed results based on TTL, or providing a method to periodically purge old results.

Copilot uses AI. Check for mistakes.
Comment on lines +169 to +180
if running.into_iter().any(|id| id == task_id) {
let timestamp = current_timestamp();
let task = rmcp::model::Task {
task_id,
status: rmcp::model::TaskStatus::Working,
status_message: None,
created_at: timestamp.clone(),
last_updated_at: Some(timestamp),
ttl: None,
poll_interval: None,
};
return Ok(rmcp::model::GetTaskInfoResult { task: Some(task) });
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated created_at and last_updated_at timestamps use current_timestamp() which calls chrono::Utc::now() at the time of query, not when the task was actually created. This means the timestamps don't reflect the true task creation or update times, but rather when the status was queried. These timestamps should be stored in the RunningTask structure and retrieved from there for accuracy.

Copilot uses AI. Check for mistakes.
Comment on lines +68 to +70
self.enqueue_task(request.params, context.clone())
.await
.map(ServerResult::CreateTaskResult)
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task routing logic clones the RequestContext (line 68) when enqueueing a task, but this clone contains a CancellationToken which doesn't behave as expected when cloned. Cloning a CancellationToken creates a new token that shares the same cancellation state, so cancelling the original context will affect the cloned one. However, the task spawned with the cloned context might outlive the original request, leading to unexpected cancellation behavior. Consider creating a new CancellationToken for the task or document this behavior clearly.

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +230
/// Check for tasks that have exceeded their timeout and handle them appropriately.
pub fn check_timeouts(&mut self) {
let now = std::time::Instant::now();
let mut timed_out_tasks = Vec::new();

for (task_id, task) in &self.running_tasks {
if let Some(timeout_duration) = task.timeout {
if now.duration_since(task.started_at).as_secs() > timeout_duration {
task.task_handle.abort();
timed_out_tasks.push(task_id.clone());
}
}
}

for task_id in timed_out_tasks {
if let Some(task) = self.running_tasks.remove(&task_id) {
let timeout_result = TaskResult {
descriptor: task.descriptor,
result: Err(Error::TaskError("Operation timed out".to_string())),
};
self.completed_results.push(timeout_result);
}
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task timeout mechanism has a race condition. A task is aborted in check_timeouts (line 216), but spawn_async_task also applies timeout via tokio::time::timeout (lines 170-173). If check_timeouts is ever called, it could abort a task that's also being timed out by tokio, leading to two timeout results being generated. The timeout responsibility should be handled in one place, not both.

Suggested change
/// Check for tasks that have exceeded their timeout and handle them appropriately.
pub fn check_timeouts(&mut self) {
let now = std::time::Instant::now();
let mut timed_out_tasks = Vec::new();
for (task_id, task) in &self.running_tasks {
if let Some(timeout_duration) = task.timeout {
if now.duration_since(task.started_at).as_secs() > timeout_duration {
task.task_handle.abort();
timed_out_tasks.push(task_id.clone());
}
}
}
for task_id in timed_out_tasks {
if let Some(task) = self.running_tasks.remove(&task_id) {
let timeout_result = TaskResult {
descriptor: task.descriptor,
result: Err(Error::TaskError("Operation timed out".to_string())),
};
self.completed_results.push(timeout_result);
}
}
/// Check for tasks that have exceeded their timeout.
/// (No-op: timeout is now handled exclusively in spawn_async_task via tokio::time::timeout.)
pub fn check_timeouts(&mut self) {
// Timeout handling is now managed by tokio::time::timeout in spawn_async_task.
// This method is retained for API compatibility but does nothing.

Copilot uses AI. Check for mistakes.
Comment on lines +34 to +62
async fn list_tasks(
&self,
_request: Option<rmcp::model::PaginatedRequestParam>,
_: rmcp::service::RequestContext<rmcp::RoleServer>,
) -> Result<rmcp::model::ListTasksResult, McpError> {
let running_ids = (#processor).lock().await.list_running();
let total = running_ids.len() as u64;
let tasks = running_ids
.into_iter()
.map(|task_id| {
let timestamp = rmcp::task_manager::current_timestamp();
rmcp::model::Task {
task_id,
status: rmcp::model::TaskStatus::Working,
status_message: None,
created_at: timestamp.clone(),
last_updated_at: Some(timestamp),
ttl: None,
poll_interval: None,
}
})
.collect::<Vec<_>>();

Ok(rmcp::model::ListTasksResult {
tasks,
next_cursor: None,
total: Some(total),
})
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated list_tasks method assumes all running tasks have status Working, but it doesn't check completed results that might not have been collected yet. This means tasks that have just completed but haven't been polled yet won't appear in the list, which could confuse clients. The method should call collect_completed_results first and include recently completed tasks in the listing.

Copilot uses AI. Check for mistakes.
Comment on lines +190 to +243
let get_result_fn = quote! {
async fn get_task_result(
&self,
request: rmcp::model::GetTaskResultParam,
_context: rmcp::service::RequestContext<rmcp::RoleServer>,
) -> Result<rmcp::model::TaskResult, McpError> {
use std::time::Duration;
let task_id = request.task_id.clone();

loop {
// Scope the lock so we can await outside if needed
{
let mut processor = (#processor).lock().await;
processor.collect_completed_results();

if let Some(task_result) = processor.take_completed_result(&task_id) {
match task_result.result {
Ok(boxed) => {
if let Some(tool) = boxed.as_any().downcast_ref::<rmcp::task_manager::ToolCallTaskResult>() {
match &tool.result {
Ok(call_tool) => {
let value = ::serde_json::to_value(call_tool).unwrap_or(::serde_json::Value::Null);
return Ok(rmcp::model::TaskResult {
content_type: "application/json".to_string(),
value,
summary: None,
});
}
Err(err) => return Err(McpError::internal_error(
format!("task failed: {}", err),
None,
)),
}
} else {
return Err(McpError::internal_error("unsupported task result transport", None));
}
}
Err(err) => return Err(McpError::internal_error(
format!("task execution error: {}", err),
None,
)),
}
}

// Not completed yet: if not running, return not found
let running = processor.list_running();
if !running.iter().any(|id| id == &task_id) {
return Err(McpError::resource_not_found(format!("task not found: {}", task_id), None));
}
}

tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_task_result method implementation lacks test coverage. While there's a basic integration test that verifies task enqueueing and listing, there's no test that validates the actual result retrieval mechanism via GetTaskResultRequest. This is a critical path that involves complex polling logic and should be tested to ensure it correctly waits for and returns task results.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

T-config Configuration file changes T-core Core library changes T-dependencies Dependencies related changes T-examples Example code changes T-handler Handler implementation changes T-macros Macro changes T-model Model/data structure changes T-test Testing related changes T-transport Transport layer changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant