Skip to content

Commit ce6e204

Browse files
committed
Close servlet streamable HTTP transports on async lifecycle events
1 parent ff78191 commit ce6e204

2 files changed

Lines changed: 312 additions & 31 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import io.modelcontextprotocol.json.McpJsonMapper;
3333
import io.modelcontextprotocol.util.KeepAliveScheduler;
3434
import jakarta.servlet.AsyncContext;
35+
import jakarta.servlet.AsyncEvent;
36+
import jakarta.servlet.AsyncListener;
3537
import jakarta.servlet.ServletException;
3638
import jakarta.servlet.annotation.WebServlet;
3739
import jakarta.servlet.http.HttpServlet;
@@ -317,6 +319,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
317319
// Check if this is a replay request
318320
if (request.getHeader(HttpHeaders.LAST_EVENT_ID) != null) {
319321
String lastId = request.getHeader(HttpHeaders.LAST_EVENT_ID);
322+
registerAsyncLifecycle(asyncContext, sessionId, sessionTransport::close);
320323

321324
try {
322325
session.replay(lastId)
@@ -330,44 +333,21 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
330333
}
331334
catch (Exception e) {
332335
logger.error("Failed to replay message: {}", e.getMessage());
333-
asyncContext.complete();
336+
sessionTransport.close();
334337
}
335338
});
336339
}
337340
catch (Exception e) {
338341
logger.error("Failed to replay messages: {}", e.getMessage());
339-
asyncContext.complete();
342+
sessionTransport.close();
340343
}
341344
}
342345
else {
343346
// Establish new listening stream
344347
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session
345348
.listeningStream(sessionTransport);
346349

347-
asyncContext.addListener(new jakarta.servlet.AsyncListener() {
348-
@Override
349-
public void onComplete(jakarta.servlet.AsyncEvent event) throws IOException {
350-
logger.debug("SSE connection completed for session: {}", sessionId);
351-
listeningStream.close();
352-
}
353-
354-
@Override
355-
public void onTimeout(jakarta.servlet.AsyncEvent event) throws IOException {
356-
logger.debug("SSE connection timed out for session: {}", sessionId);
357-
listeningStream.close();
358-
}
359-
360-
@Override
361-
public void onError(jakarta.servlet.AsyncEvent event) throws IOException {
362-
logger.debug("SSE connection error for session: {}", sessionId);
363-
listeningStream.close();
364-
}
365-
366-
@Override
367-
public void onStartAsync(jakarta.servlet.AsyncEvent event) throws IOException {
368-
// No action needed
369-
}
370-
});
350+
registerAsyncLifecycle(asyncContext, sessionId, listeningStream::close);
371351
}
372352
}
373353
catch (Exception e) {
@@ -519,6 +499,7 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
519499

520500
HttpServletStreamableMcpSessionTransport sessionTransport = new HttpServletStreamableMcpSessionTransport(
521501
sessionId, asyncContext, response.getWriter());
502+
registerAsyncLifecycle(asyncContext, sessionId, sessionTransport::close);
522503

523504
try {
524505
session.responseStream(jsonrpcRequest, sessionTransport)
@@ -527,7 +508,7 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
527508
}
528509
catch (Exception e) {
529510
logger.error("Failed to handle request stream: {}", e.getMessage());
530-
asyncContext.complete();
511+
sessionTransport.close();
531512
}
532513
}
533514
else {
@@ -557,6 +538,32 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
557538
}
558539
}
559540

541+
private void registerAsyncLifecycle(AsyncContext asyncContext, String sessionId, Runnable onClose) {
542+
asyncContext.addListener(new AsyncListener() {
543+
@Override
544+
public void onComplete(AsyncEvent event) throws IOException {
545+
logger.debug("SSE async context completed for session: {}", sessionId);
546+
onClose.run();
547+
}
548+
549+
@Override
550+
public void onTimeout(AsyncEvent event) throws IOException {
551+
logger.debug("SSE async context timed out for session: {}", sessionId);
552+
onClose.run();
553+
}
554+
555+
@Override
556+
public void onError(AsyncEvent event) throws IOException {
557+
logger.debug("SSE async context errored for session: {}", sessionId);
558+
onClose.run();
559+
}
560+
561+
@Override
562+
public void onStartAsync(AsyncEvent event) throws IOException {
563+
}
564+
});
565+
}
566+
560567
/**
561568
* Handles DELETE requests for session deletion.
562569
* @param request The HTTP servlet request
@@ -747,8 +754,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId
747754
}
748755
catch (Exception e) {
749756
logger.error("Failed to send message to session {}: {}", this.sessionId, e.getMessage());
750-
HttpServletStreamableServerTransportProvider.this.sessions.remove(this.sessionId);
751-
this.asyncContext.complete();
757+
this.close();
752758
}
753759
finally {
754760
lock.unlock();
@@ -792,8 +798,6 @@ public void close() {
792798
}
793799

794800
this.closed = true;
795-
796-
// HttpServletStreamableServerTransportProvider.this.sessions.remove(this.sessionId);
797801
this.asyncContext.complete();
798802
logger.debug("Successfully completed async context for session {}", sessionId);
799803
}

0 commit comments

Comments
 (0)