diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 2613b530c4..0b2808ab8c 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -706,6 +706,12 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr # Create SSE stream sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0) + # Store writer reference so close_standalone_sse_stream() can close it + self._sse_stream_writers[GET_STREAM_KEY] = sse_stream_writer + + # Get protocol version from header for priming event decision + protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION) + async def standalone_sse_writer(): try: # Create a standalone message stream for server-initiated messages @@ -714,6 +720,17 @@ async def standalone_sse_writer(): standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1] async with sse_stream_writer, standalone_stream_reader: + # Send an immediate event to establish connection and prevent hang + # This is crucial for GET requests which have no initial data to send + if self._event_store and protocol_version >= "2025-11-25": + # Send proper priming event with resumability support + await self._maybe_send_priming_event(GET_STREAM_KEY, sse_stream_writer, protocol_version) + else: + # Send a simple "open" event to confirm connection is established + # Without this, GET requests hang waiting for data + open_event: dict[str, str] = {"event": "open", "data": ""} + await sse_stream_writer.send(open_event) + # Process messages from the standalone stream async for event_message in standalone_stream_reader: # For the standalone stream, we handle: @@ -724,10 +741,14 @@ async def standalone_sse_writer(): # Send the message via SSE event_data = self._create_event_data(event_message) await sse_stream_writer.send(event_data) + except anyio.ClosedResourceError: + # Expected when close_standalone_sse_stream() is called + logger.debug("Standalone SSE stream closed by close_standalone_sse_stream()") except Exception: logger.exception("Error in standalone SSE writer") finally: logger.debug("Closing standalone SSE writer") + self._sse_stream_writers.pop(GET_STREAM_KEY, None) await self._clean_up_memory_streams(GET_STREAM_KEY) # Create and start EventSourceResponse diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 0ed4250533..479725d3f9 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2350,3 +2350,96 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +def test_get_request_receives_priming_event_with_event_store( + event_server: tuple[SimpleEventStore, str], +) -> None: + """ + Test that GET requests to /mcp receive a priming event immediately. + + This test verifies the fix for the issue where GET requests would hang + because the standalone_sse_writer didn't send a priming event before + entering the message loop. + + With event_store configured and protocol version >= 2025-11-25, the server + should send a priming event (empty data with event id) immediately after + establishing the SSE connection, preventing the client from hanging. + """ + event_store, server_url = event_server + mcp_url = f"{server_url}/mcp" + + # Use latest protocol version (2025-11-25) to enable priming events + init_request_latest = { + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "clientInfo": {"name": "test-client", "version": "1.0"}, + "protocolVersion": "2025-11-25", # Must be >= 2025-11-25 for priming events + "capabilities": {}, + }, + "id": "init-1", + } + + # First, initialize a session via POST + init_response = requests.post( + mcp_url, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + }, + json=init_request_latest, + ) + assert init_response.status_code == 200 + + # Get session ID + session_id = init_response.headers.get(MCP_SESSION_ID_HEADER) + assert session_id is not None + + # Extract negotiated protocol version from SSE response + # Note: With event_store, the POST response includes priming event first (empty data) + # then the actual initialize response + negotiated_version = None + for line in init_response.text.splitlines(): + if line.startswith("data: ") and line[6:].strip(): # Skip empty data (priming event) + try: + init_data = json.loads(line[6:]) + if "result" in init_data and "protocolVersion" in init_data["result"]: + negotiated_version = init_data["result"]["protocolVersion"] + break + except json.JSONDecodeError: + continue + assert negotiated_version is not None, "Could not extract protocol version from init response" + + # Now make a GET request to establish SSE stream with a short timeout + # Before the fix, this would hang indefinitely waiting for messages + # After the fix, we should get the priming event immediately + get_response = requests.get( + mcp_url, + headers={ + "Accept": "text/event-stream", + MCP_SESSION_ID_HEADER: session_id, + MCP_PROTOCOL_VERSION_HEADER: negotiated_version, + }, + stream=True, + timeout=3, # 3 second timeout - priming event should arrive immediately + ) + + assert get_response.status_code == 200 + assert get_response.headers.get("Content-Type") == "text/event-stream" + + # Try to read the first chunk from the stream - should be the priming event + # The priming event format is: "id: \ndata: \n\n" + try: + # Read up to 1KB to get the priming event + priming_received = False + for chunk in get_response.iter_content(chunk_size=1024, decode_unicode=True): + if chunk and ("id:" in chunk or "data:" in chunk): + priming_received = True + break + + assert priming_received, ( + "GET request should receive priming event immediately with event_store configured" + ) + finally: + get_response.close()