Skip to content

Commit ef8edd6

Browse files
committed
fix(client): propagate SSE POST errors to caller instead of hanging
In `sse_client`, the message-POST coroutine `_send_message` called `response.raise_for_status()` with no handler. When the server returned a non-2xx (401/403/404/5xx) or the POST hit a network error, the exception propagated into the `post_writer` task group and was swallowed by its `except Exception: logger.exception("Error in post_writer")`. The failure was never delivered through the read stream, so a caller blocked on `read_stream.receive()` (e.g. `ClientSession.initialize()`) hung forever. Catch `httpx.HTTPError` inside `_send_message` and forward it to `read_stream_writer`, the same pattern stdio.py and websocket.py already use, and that `streamable_http.py` uses for its >= 400 responses. The caller now receives the error promptly instead of deadlocking. Adds an in-process regression test (matching the #2765 harness) whose message POST returns 503; it asserts the caller receives the `HTTPStatusError` via the read stream within a bounded timeout, and fails (times out) against the unpatched client. Refs #2110
1 parent 19fe9fa commit ef8edd6

2 files changed

Lines changed: 66 additions & 10 deletions

File tree

src/mcp/client/sse.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,24 @@ async def post_writer(endpoint_url: str):
120120

121121
async def _send_message(session_message: SessionMessage) -> None:
122122
logger.debug(f"Sending client message: {session_message}")
123-
response = await client.post(
124-
endpoint_url,
125-
json=session_message.message.model_dump(
126-
by_alias=True,
127-
mode="json",
128-
exclude_unset=True,
129-
),
130-
)
131-
response.raise_for_status()
123+
try:
124+
response = await client.post(
125+
endpoint_url,
126+
json=session_message.message.model_dump(
127+
by_alias=True,
128+
mode="json",
129+
exclude_unset=True,
130+
),
131+
)
132+
response.raise_for_status()
133+
except httpx.HTTPError as exc:
134+
# Forward the failure to the caller via the read stream instead of
135+
# letting it surface as a swallowed task-group error, which would
136+
# leave read_stream.receive() blocked forever (#2110). Mirrors the
137+
# stream-error handling in stdio.py and streamable_http.py.
138+
logger.exception("Error sending client message")
139+
await read_stream_writer.send(exc)
140+
return
132141
logger.debug(f"Client message sent successfully: {response.status_code}")
133142

134143
async for session_message in write_stream_reader:

tests/shared/test_sse.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from inline_snapshot import snapshot
1414
from starlette.applications import Starlette
1515
from starlette.requests import Request
16-
from starlette.responses import Response
16+
from starlette.responses import Response, StreamingResponse
1717
from starlette.routing import Mount, Route
1818

1919
import mcp.client.sse
@@ -25,6 +25,7 @@
2525
from mcp.server.transport_security import TransportSecuritySettings
2626
from mcp.shared._httpx_utils import McpHttpClientFactory
2727
from mcp.shared.exceptions import MCPError
28+
from mcp.shared.message import SessionMessage
2829
from mcp.types import (
2930
CallToolRequestParams,
3031
CallToolResult,
@@ -108,6 +109,52 @@ def make_server_app() -> Starlette:
108109
return make_app(Server(SERVER_NAME, on_read_resource=_handle_read_resource))
109110

110111

112+
def make_failing_post_app() -> Starlette:
113+
"""An SSE app that completes the handshake but fails every message POST with 503.
114+
115+
The `/sse` stream announces a valid endpoint (so the client reaches the POST path) and
116+
stays open until the client disconnects; `/messages/` always returns 503. Used to drive
117+
the client's POST-error propagation path (#2110).
118+
"""
119+
120+
async def handle_sse(request: Request) -> Response:
121+
async def event_stream() -> AsyncGenerator[bytes, None]:
122+
yield b"event: endpoint\r\ndata: /messages/\r\n\r\n"
123+
# Hold the stream open with a single, branch-free wait (so coverage is
124+
# deterministic) that comfortably outlasts the in-process POST round trip.
125+
# The client tears the connection down as soon as it receives the error.
126+
await anyio.sleep(1.0)
127+
128+
return StreamingResponse(event_stream(), media_type="text/event-stream")
129+
130+
async def handle_message(request: Request) -> Response:
131+
return Response("upstream exploded", status_code=503)
132+
133+
return Starlette(
134+
routes=[
135+
Route("/sse", endpoint=handle_sse),
136+
Route("/messages/", endpoint=handle_message, methods=["POST"]),
137+
]
138+
)
139+
140+
141+
@pytest.mark.anyio
142+
async def test_sse_client_post_error_propagates_to_caller() -> None:
143+
"""A non-2xx on the message POST surfaces to the caller via the read stream.
144+
145+
Regression test for #2110: the error was previously swallowed by the post_writer task
146+
group and `read_stream.receive()` blocked forever.
147+
"""
148+
factory = in_process_client_factory(make_failing_post_app())
149+
async with sse_client(f"{BASE_URL}/sse", httpx_client_factory=factory) as (read_stream, write_stream):
150+
await write_stream.send(SessionMessage(types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")))
151+
with anyio.fail_after(10):
152+
item = await read_stream.receive()
153+
154+
assert isinstance(item, httpx.HTTPStatusError)
155+
assert item.response.status_code == 503
156+
157+
111158
@pytest.mark.anyio
112159
async def test_raw_sse_connection() -> None:
113160
"""The SSE GET responds 200 with an event-stream content type, announcing the session

0 commit comments

Comments
 (0)