Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,12 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr
# Create SSE stream
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)

# Store writer reference so close_standalone_sse_stream() can close it
self._sse_stream_writers[GET_STREAM_KEY] = sse_stream_writer

# Get protocol version from header for priming event decision
protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)

async def standalone_sse_writer():
try:
# Create a standalone message stream for server-initiated messages
Expand All @@ -714,6 +720,17 @@ async def standalone_sse_writer():
standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1]

async with sse_stream_writer, standalone_stream_reader:
# Send an immediate event to establish connection and prevent hang
# This is crucial for GET requests which have no initial data to send
if self._event_store and protocol_version >= "2025-11-25":
# Send proper priming event with resumability support
await self._maybe_send_priming_event(GET_STREAM_KEY, sse_stream_writer, protocol_version)
else:
# Send a simple "open" event to confirm connection is established
# Without this, GET requests hang waiting for data
open_event: dict[str, str] = {"event": "open", "data": ""}
await sse_stream_writer.send(open_event)

# Process messages from the standalone stream
async for event_message in standalone_stream_reader:
# For the standalone stream, we handle:
Expand All @@ -724,10 +741,14 @@ async def standalone_sse_writer():
# Send the message via SSE
event_data = self._create_event_data(event_message)
await sse_stream_writer.send(event_data)
except anyio.ClosedResourceError:
# Expected when close_standalone_sse_stream() is called
logger.debug("Standalone SSE stream closed by close_standalone_sse_stream()")
except Exception:
logger.exception("Error in standalone SSE writer")
finally:
logger.debug("Closing standalone SSE writer")
self._sse_stream_writers.pop(GET_STREAM_KEY, None)
await self._clean_up_memory_streams(GET_STREAM_KEY)

# Create and start EventSourceResponse
Expand Down
93 changes: 93 additions & 0 deletions tests/shared/test_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2350,3 +2350,96 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(

assert "content-type" in headers_data
assert headers_data["content-type"] == "application/json"


def test_get_request_receives_priming_event_with_event_store(
event_server: tuple[SimpleEventStore, str],
) -> None:
"""
Test that GET requests to /mcp receive a priming event immediately.

This test verifies the fix for the issue where GET requests would hang
because the standalone_sse_writer didn't send a priming event before
entering the message loop.

With event_store configured and protocol version >= 2025-11-25, the server
should send a priming event (empty data with event id) immediately after
establishing the SSE connection, preventing the client from hanging.
"""
event_store, server_url = event_server
mcp_url = f"{server_url}/mcp"

# Use latest protocol version (2025-11-25) to enable priming events
init_request_latest = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"clientInfo": {"name": "test-client", "version": "1.0"},
"protocolVersion": "2025-11-25", # Must be >= 2025-11-25 for priming events
"capabilities": {},
},
"id": "init-1",
}

# First, initialize a session via POST
init_response = requests.post(
mcp_url,
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
},
json=init_request_latest,
)
assert init_response.status_code == 200

# Get session ID
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
assert session_id is not None

# Extract negotiated protocol version from SSE response
# Note: With event_store, the POST response includes priming event first (empty data)
# then the actual initialize response
negotiated_version = None
for line in init_response.text.splitlines():
if line.startswith("data: ") and line[6:].strip(): # Skip empty data (priming event)
try:
init_data = json.loads(line[6:])
if "result" in init_data and "protocolVersion" in init_data["result"]:
negotiated_version = init_data["result"]["protocolVersion"]
break
except json.JSONDecodeError:
continue
assert negotiated_version is not None, "Could not extract protocol version from init response"

# Now make a GET request to establish SSE stream with a short timeout
# Before the fix, this would hang indefinitely waiting for messages
# After the fix, we should get the priming event immediately
get_response = requests.get(
mcp_url,
headers={
"Accept": "text/event-stream",
MCP_SESSION_ID_HEADER: session_id,
MCP_PROTOCOL_VERSION_HEADER: negotiated_version,
},
stream=True,
timeout=3, # 3 second timeout - priming event should arrive immediately
)

assert get_response.status_code == 200
assert get_response.headers.get("Content-Type") == "text/event-stream"

# Try to read the first chunk from the stream - should be the priming event
# The priming event format is: "id: <event_id>\ndata: \n\n"
try:
# Read up to 1KB to get the priming event
priming_received = False
for chunk in get_response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk and ("id:" in chunk or "data:" in chunk):
priming_received = True
break

assert priming_received, (
"GET request should receive priming event immediately with event_store configured"
)
finally:
get_response.close()
Loading