Skip to content

Commit 9bbfbe5

Browse files
authored
Close McpTransportSession on transport close (#632)
* Client HTTP transports: use McpTransportSession interface instead of concrete types * Streamable HTTP tranports: .closeGracefully closes the session Signed-off-by: Daniel Garnier-Moiroux <[email protected]>
1 parent 2ee5853 commit 9bbfbe5

File tree

8 files changed

+229
-16
lines changed

8 files changed

+229
-16
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
3030
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
3131
import io.modelcontextprotocol.common.McpTransportContext;
32+
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
3233
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
3334
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
3435
import io.modelcontextprotocol.spec.HttpHeaders;
@@ -118,7 +119,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
118119

119120
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
120121

121-
private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference<>();
122+
private final AtomicReference<McpTransportSession<Disposable>> activeSession = new AtomicReference<>();
122123

123124
private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();
124125

@@ -163,12 +164,20 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
163164
});
164165
}
165166

166-
private DefaultMcpTransportSession createTransportSession() {
167+
private McpTransportSession<Disposable> createTransportSession() {
167168
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
168169
: createDelete(sessionId);
169170
return new DefaultMcpTransportSession(onClose);
170171
}
171172

173+
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
174+
var existingSessionId = Optional.ofNullable(existingSession)
175+
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
176+
.flatMap(McpTransportSession::sessionId)
177+
.orElse(null);
178+
return new ClosedMcpTransportSession<>(existingSessionId);
179+
}
180+
172181
private Publisher<Void> createDelete(String sessionId) {
173182

174183
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
@@ -210,9 +219,9 @@ private void handleException(Throwable t) {
210219
public Mono<Void> closeGracefully() {
211220
return Mono.defer(() -> {
212221
logger.debug("Graceful close triggered");
213-
DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
222+
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
214223
if (currentSession != null) {
215-
return currentSession.closeGracefully();
224+
return Mono.from(currentSession.closeGracefully());
216225
}
217226
return Mono.empty();
218227
});
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*/
4+
package io.modelcontextprotocol.spec;
5+
6+
import java.util.Optional;
7+
8+
import org.reactivestreams.Publisher;
9+
import reactor.core.publisher.Mono;
10+
import reactor.util.annotation.Nullable;
11+
12+
/**
13+
* Represents a closed MCP session, which may not be reused. All calls will throw a
14+
* {@link McpTransportSessionClosedException}.
15+
*
16+
* @param <CONNECTION> the resource representing the connection that the transport
17+
* manages.
18+
* @author Daniel Garnier-Moiroux
19+
*/
20+
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
21+
22+
private final String sessionId;
23+
24+
public ClosedMcpTransportSession(@Nullable String sessionId) {
25+
this.sessionId = sessionId;
26+
}
27+
28+
@Override
29+
public Optional<String> sessionId() {
30+
throw new McpTransportSessionClosedException(sessionId);
31+
}
32+
33+
@Override
34+
public boolean markInitialized(String sessionId) {
35+
throw new McpTransportSessionClosedException(sessionId);
36+
}
37+
38+
@Override
39+
public void addConnection(CONNECTION connection) {
40+
throw new McpTransportSessionClosedException(sessionId);
41+
}
42+
43+
@Override
44+
public void removeConnection(CONNECTION connection) {
45+
throw new McpTransportSessionClosedException(sessionId);
46+
}
47+
48+
@Override
49+
public void close() {
50+
51+
}
52+
53+
@Override
54+
public Publisher<Void> closeGracefully() {
55+
return Mono.empty();
56+
}
57+
58+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.spec;
6+
7+
import reactor.util.annotation.Nullable;
8+
9+
/**
10+
* Exception thrown when trying to use an {@link McpTransportSession} that has been
11+
* closed.
12+
*
13+
* @see ClosedMcpTransportSession
14+
* @author Daniel Garnier-Moiroux
15+
*/
16+
public class McpTransportSessionClosedException extends RuntimeException {
17+
18+
public McpTransportSessionClosedException(@Nullable String sessionId) {
19+
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
20+
: "MCP session has been closed");
21+
}
22+
23+
}

mcp-core/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.modelcontextprotocol.spec.McpClientTransport;
1111
import io.modelcontextprotocol.spec.McpSchema;
1212
import io.modelcontextprotocol.spec.McpTransport;
13-
import org.junit.jupiter.api.Disabled;
13+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
1414
import org.junit.jupiter.api.Test;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
@@ -222,9 +222,10 @@ void testSessionClose() {
222222
// In case of Streamable HTTP this call should issue a HTTP DELETE request
223223
// invalidating the session
224224
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
225-
// The next use should immediately re-initialize with no issue and send the
226-
// request without any broken connections.
227-
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
225+
// The next tries to use the closed session and fails
226+
StepVerifier.create(mcpAsyncClient.ping())
227+
.expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException)
228+
.verify();
228229
});
229230
}
230231

mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,39 @@ void testAsyncRequestCustomizer() throws URISyntaxException {
125125
});
126126
}
127127

128+
@Test
129+
void testCloseUninitialized() {
130+
var transport = HttpClientStreamableHttpTransport.builder(host).build();
131+
132+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
133+
134+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
135+
McpSchema.ClientCapabilities.builder().roots(true).build(),
136+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
137+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
138+
"test-id", initializeRequest);
139+
140+
StepVerifier.create(transport.sendMessage(testMessage))
141+
.expectErrorMessage("MCP session has been closed")
142+
.verify();
143+
}
144+
145+
@Test
146+
void testCloseInitialized() {
147+
var transport = HttpClientStreamableHttpTransport.builder(host).build();
148+
149+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
150+
McpSchema.ClientCapabilities.builder().roots(true).build(),
151+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
152+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
153+
"test-id", initializeRequest);
154+
155+
StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete();
156+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
157+
158+
StepVerifier.create(transport.sendMessage(testMessage))
159+
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
160+
.verify();
161+
}
162+
128163
}

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.modelcontextprotocol.json.TypeRef;
2626
import io.modelcontextprotocol.json.McpJsonMapper;
2727

28+
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
2829
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
2930
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
3031
import io.modelcontextprotocol.spec.HttpHeaders;
@@ -98,7 +99,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
9899

99100
private final boolean resumableStreams;
100101

101-
private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference<>();
102+
private final AtomicReference<McpTransportSession<Disposable>> activeSession = new AtomicReference<>();
102103

103104
private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();
104105

@@ -143,7 +144,7 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
143144
});
144145
}
145146

146-
private DefaultMcpTransportSession createTransportSession() {
147+
private McpTransportSession<Disposable> createTransportSession() {
147148
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
148149
: webClient.delete()
149150
.uri(this.endpoint)
@@ -159,6 +160,14 @@ private DefaultMcpTransportSession createTransportSession() {
159160
return new DefaultMcpTransportSession(onClose);
160161
}
161162

163+
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
164+
var existingSessionId = Optional.ofNullable(existingSession)
165+
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
166+
.flatMap(McpTransportSession::sessionId)
167+
.orElse(null);
168+
return new ClosedMcpTransportSession<>(existingSessionId);
169+
}
170+
162171
@Override
163172
public void setExceptionHandler(Consumer<Throwable> handler) {
164173
logger.debug("Exception handler registered");
@@ -182,9 +191,9 @@ private void handleException(Throwable t) {
182191
public Mono<Void> closeGracefully() {
183192
return Mono.defer(() -> {
184193
logger.debug("Graceful close triggered");
185-
DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
194+
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
186195
if (currentSession != null) {
187-
return currentSession.closeGracefully();
196+
return Mono.from(currentSession.closeGracefully());
188197
}
189198
return Mono.empty();
190199
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
package io.modelcontextprotocol.client.transport;
5+
6+
import io.modelcontextprotocol.spec.McpSchema;
7+
import org.junit.jupiter.api.AfterAll;
8+
import org.junit.jupiter.api.BeforeAll;
9+
import org.junit.jupiter.api.Test;
10+
import org.testcontainers.containers.GenericContainer;
11+
import org.testcontainers.containers.wait.strategy.Wait;
12+
import reactor.test.StepVerifier;
13+
14+
import org.springframework.web.reactive.function.client.WebClient;
15+
16+
class WebClientStreamableHttpTransportTest {
17+
18+
static String host = "http://localhost:3001";
19+
20+
static WebClient.Builder builder;
21+
22+
@SuppressWarnings("resource")
23+
static GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v3")
24+
.withCommand("node dist/index.js streamableHttp")
25+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
26+
.withExposedPorts(3001)
27+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
28+
29+
@BeforeAll
30+
static void startContainer() {
31+
container.start();
32+
int port = container.getMappedPort(3001);
33+
host = "http://" + container.getHost() + ":" + port;
34+
builder = WebClient.builder().baseUrl(host);
35+
}
36+
37+
@AfterAll
38+
static void stopContainer() {
39+
container.stop();
40+
}
41+
42+
@Test
43+
void testCloseUninitialized() {
44+
var transport = WebClientStreamableHttpTransport.builder(builder).build();
45+
46+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
47+
48+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
49+
McpSchema.ClientCapabilities.builder().roots(true).build(),
50+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
51+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
52+
"test-id", initializeRequest);
53+
54+
StepVerifier.create(transport.sendMessage(testMessage))
55+
.expectErrorMessage("MCP session has been closed")
56+
.verify();
57+
}
58+
59+
@Test
60+
void testCloseInitialized() {
61+
var transport = WebClientStreamableHttpTransport.builder(builder).build();
62+
63+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
64+
McpSchema.ClientCapabilities.builder().roots(true).build(),
65+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
66+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
67+
"test-id", initializeRequest);
68+
69+
StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete();
70+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
71+
72+
StepVerifier.create(transport.sendMessage(testMessage))
73+
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
74+
.verify();
75+
}
76+
77+
}

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.modelcontextprotocol.spec.McpClientTransport;
1111
import io.modelcontextprotocol.spec.McpSchema;
1212
import io.modelcontextprotocol.spec.McpTransport;
13-
import org.junit.jupiter.api.Disabled;
13+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
1414
import org.junit.jupiter.api.Test;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
@@ -221,9 +221,10 @@ void testSessionClose() {
221221
// In case of Streamable HTTP this call should issue a HTTP DELETE request
222222
// invalidating the session
223223
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
224-
// The next use should immediately re-initialize with no issue and send the
225-
// request without any broken connections.
226-
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
224+
// The next tries to use the closed session and fails
225+
StepVerifier.create(mcpAsyncClient.ping())
226+
.expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException)
227+
.verify();
227228
});
228229
}
229230

0 commit comments

Comments
 (0)