Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-streamable-http-error-response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/client': patch
---

Fix StreamableHTTPClientTransport to handle error responses in SSE streams
4 changes: 3 additions & 1 deletion packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { FetchLike, JSONRPCMessage, Transport } from '@modelcontextprotocol
import {
createFetchWithInit,
isInitializedNotification,
isJSONRPCErrorResponse,
isJSONRPCRequest,
isJSONRPCResultResponse,
JSONRPCMessageSchema,
Expand Down Expand Up @@ -361,7 +362,8 @@ export class StreamableHTTPClientTransport implements Transport {
if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
if (isJSONRPCResultResponse(message)) {
// Handle both success AND error responses for completion detection and ID remapping
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
// Mark that we received a response - no need to reconnect for this request
receivedResponse = true;
if (replayMessageId !== undefined) {
Expand Down
72 changes: 72 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,78 @@ describe('StreamableHTTPClientTransport', () => {
expect(fetchMock.mock.calls[0]![1]?.method).toBe('POST');
});

it('should NOT reconnect a POST stream when error response was received', async () => {
// ARRANGE
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions: {
initialReconnectionDelay: 10,
maxRetries: 1,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});

const messageSpy = vi.fn();
transport.onmessage = messageSpy;

// Create a stream that sends:
// 1. Priming event with ID (enables potential reconnection)
// 2. An error response (should also prevent reconnection, just like success)
// 3. Then closes
const streamWithErrorResponse = new ReadableStream({
start(controller) {
// Priming event with ID
controller.enqueue(new TextEncoder().encode('id: priming-123\ndata: \n\n'));
// An error response to the request (tool not found, for example)
controller.enqueue(
new TextEncoder().encode(
'id: error-456\ndata: {"jsonrpc":"2.0","error":{"code":-32602,"message":"Tool not found"},"id":"request-1"}\n\n'
)
);
// Stream closes normally
controller.close();
}
});

const fetchMock = global.fetch as Mock;
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: streamWithErrorResponse
});

const requestMessage: JSONRPCRequest = {
jsonrpc: '2.0',
method: 'tools/call',
id: 'request-1',
params: { name: 'nonexistent-tool' }
};

// ACT
await transport.start();
await transport.send(requestMessage);
await vi.advanceTimersByTimeAsync(50);

// ASSERT
// THE KEY ASSERTION: Fetch was called ONCE only - no reconnection!
// The error response was received, so no need to reconnect.
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0]![1]?.method).toBe('POST');

// Verify the error response was delivered to the message handler
expect(messageSpy).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
error: expect.objectContaining({
code: -32602,
message: 'Tool not found'
}),
id: 'request-1'
})
);
});

it('should not attempt reconnection after close() is called', async () => {
// ARRANGE
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
Expand Down
Loading