55(server→client and client→server) using the streamable HTTP transport.
66"""
77
8- import multiprocessing
9- import socket
10- from collections .abc import Generator
8+ import gc
9+ from collections .abc import AsyncIterator , Iterator
10+ from contextlib import asynccontextmanager
11+ from typing import Any
1112
13+ import httpx
1214import pytest
15+ from sse_starlette .sse import AppStatus
16+ from starlette .applications import Starlette
17+ from starlette .routing import Mount
1318
19+ import mcp .types as types
1420from mcp .client .session import ClientSession
1521from mcp .client .streamable_http import streamable_http_client
16- from tests .test_helpers import wait_for_server
22+ from mcp .server import Server
23+ from mcp .server .streamable_http_manager import StreamableHTTPSessionManager
24+ from mcp .types import CallToolResult , TextContent , Tool
25+ from tests .interaction .transports import StreamingASGITransport
26+
27+ # The in-process app is mounted at this origin purely so URLs are well-formed; nothing listens here.
28+ BASE_URL = "http://127.0.0.1:8000"
29+
30+ # v1's streamable-HTTP server transport leaks a handful of anyio memory streams on teardown when
31+ # run in process; the old subprocess harness never observed them. The interaction suite registers
32+ # the same two scoped filters globally from tests/interaction/conftest.py (see the comment there),
33+ # but they only take effect when that package's conftest is loaded; these markers keep the tests
34+ # themselves passing in isolated runs. Markers are item-scoped, so the autouse
35+ # `_collect_leaked_streams` fixture below garbage-collects each test's leaks inside its own
36+ # teardown, where these filters apply; without it, leaks GC'd at session cleanup escape the
37+ # scoped ignores. The filters are scoped to anyio's MemoryObject*Stream leak signature so an
38+ # unrelated leak still fails the suite.
39+ pytestmark = [
40+ pytest .mark .filterwarnings ("ignore:.*MemoryObject(Send|Receive)Stream:pytest.PytestUnraisableExceptionWarning" ),
41+ pytest .mark .filterwarnings ("ignore:.*MemoryObject(Send|Receive)Stream:ResourceWarning" ),
42+ ]
43+
44+
45+ @pytest .fixture (autouse = True )
46+ def _collect_leaked_streams () -> Iterator [None ]:
47+ """Garbage-collect each test's leaked memory streams inside its own teardown.
48+
49+ The filterwarnings marks above only apply while a test in this file is the
50+ active warning context. The leaked streams sit in reference cycles, so without
51+ a forced collection their deallocator warnings fire wherever the garbage
52+ collector happens to run next: during an unrelated test (failing it, since the
53+ global ``filterwarnings = ["error"]`` has no ignore there) or at pytest's
54+ session-unconfigure unraisable sweep (exit code 1 after all tests passed when
55+ running without xdist, e.g. ``-n 0`` for ``--pdb`` debugging).
56+ """
57+ yield
58+ gc .collect ()
59+
60+
61+ @pytest .fixture (autouse = True )
62+ def _reset_sse_starlette_exit_event () -> Iterator [None ]:
63+ """Reset sse-starlette's module-global exit Event around each test.
64+
65+ sse-starlette <3.0 (allowed by this branch's dependency floor; CI's lowest-direct leg
66+ installs it) stores an `anyio.Event` on the `AppStatus` class the first time an
67+ `EventSourceResponse` runs; that Event is bound to the test's event loop and breaks every
68+ subsequent in-process SSE response (and `json_response=False` below means every request
69+ in this module is served as one). sse-starlette 3.x switched to a ContextVar and has no
70+ such attribute. Resetting on both sides of the test keeps this module immune to a stale
71+ Event left behind by an earlier test on the same worker as well as cleaning up after its
72+ own. This mirrors the autouse fixtures in tests/shared/test_sse.py and
73+ tests/interaction/conftest.py.
74+ """
75+ if hasattr (AppStatus , "should_exit_event" ): # pragma: no branch
76+ # setattr keeps pyright happy: the locked sse-starlette 3.x has no such attribute.
77+ setattr (AppStatus , "should_exit_event" , None ) # pragma: lax no cover
78+ yield
79+ if hasattr (AppStatus , "should_exit_event" ): # pragma: no branch
80+ setattr (AppStatus , "should_exit_event" , None ) # pragma: lax no cover
81+
1782
1883# Test constants with various Unicode characters
1984UNICODE_TEST_STRINGS = {
35100}
36101
37102
38- def run_unicode_server (port : int ) -> None : # pragma: no cover
39- """Run the Unicode test server in a separate process."""
40- # Import inside the function since this runs in a separate process
41- from collections .abc import AsyncGenerator
42- from contextlib import asynccontextmanager
43- from typing import Any
44-
45- import uvicorn
46- from starlette .applications import Starlette
47- from starlette .routing import Mount
48-
49- import mcp .types as types
50- from mcp .server import Server
51- from mcp .server .streamable_http_manager import StreamableHTTPSessionManager
52- from mcp .types import TextContent , Tool
53-
54- # Need to recreate the server setup in this process
55- server = Server (name = "unicode_test_server" )
103+ def make_unicode_server () -> Server [object , object ]:
104+ """The Unicode echo server: tool and prompt contents that exercise non-ASCII round trips."""
105+ server : Server [object , object ] = Server (name = "unicode_test_server" )
56106
57107 @server .list_tools ()
58- async def list_tools () -> list [Tool ]:
59- """List tools with Unicode descriptions."""
108+ async def handle_list_tools () -> list [Tool ]:
60109 return [
61110 Tool (
62111 name = "echo_unicode" ,
@@ -72,22 +121,12 @@ async def list_tools() -> list[Tool]:
72121 ]
73122
74123 @server .call_tool ()
75- async def call_tool (name : str , arguments : dict [str , Any ] | None ) -> list [TextContent ]:
76- """Handle tool calls with Unicode content."""
77- if name == "echo_unicode" :
78- text = arguments .get ("text" , "" ) if arguments else ""
79- return [
80- TextContent (
81- type = "text" ,
82- text = f"Echo: { text } " ,
83- )
84- ]
85- else :
86- raise ValueError (f"Unknown tool: { name } " )
124+ async def handle_call_tool (name : str , arguments : dict [str , Any ]) -> CallToolResult :
125+ assert name == "echo_unicode"
126+ return CallToolResult (content = [TextContent (type = "text" , text = f"Echo: { arguments ['text' ]} " )])
87127
88128 @server .list_prompts ()
89- async def list_prompts () -> list [types .Prompt ]:
90- """List prompts with Unicode names and descriptions."""
129+ async def handle_list_prompts () -> list [types .Prompt ]:
91130 return [
92131 types .Prompt (
93132 name = "unicode_prompt" ,
@@ -97,137 +136,90 @@ async def list_prompts() -> list[types.Prompt]:
97136 ]
98137
99138 @server .get_prompt ()
100- async def get_prompt (name : str , arguments : dict [str , Any ] | None ) -> types .GetPromptResult :
101- """Get a prompt with Unicode content."""
102- if name == "unicode_prompt" :
103- return types .GetPromptResult (
104- messages = [
105- types .PromptMessage (
106- role = "user" ,
107- content = types .TextContent (
108- type = "text" ,
109- text = "Hello世界🌍Привет안녕مرحباשלום" ,
110- ),
111- )
112- ]
113- )
114- raise ValueError (f"Unknown prompt: { name } " )
115-
116- # Create the session manager
117- session_manager = StreamableHTTPSessionManager (
118- app = server ,
119- json_response = False , # Use SSE for testing
120- )
121-
122- @asynccontextmanager
123- async def lifespan (app : Starlette ) -> AsyncGenerator [None , None ]:
124- async with session_manager .run ():
125- yield
126-
127- # Create an ASGI application
128- app = Starlette (
129- debug = True ,
130- routes = [
131- Mount ("/mcp" , app = session_manager .handle_request ),
132- ],
133- lifespan = lifespan ,
134- )
135-
136- # Run the server
137- config = uvicorn .Config (
138- app = app ,
139- host = "127.0.0.1" ,
140- port = port ,
141- log_level = "error" ,
142- )
143- uvicorn_server = uvicorn .Server (config )
144- uvicorn_server .run ()
145-
146-
147- @pytest .fixture
148- def unicode_server_port () -> int :
149- """Find an available port for the Unicode test server."""
150- with socket .socket () as s :
151- s .bind (("127.0.0.1" , 0 ))
152- return s .getsockname ()[1 ]
153-
154-
155- @pytest .fixture
156- def running_unicode_server (unicode_server_port : int ) -> Generator [str , None , None ]:
157- """Start a Unicode test server in a separate process."""
158- proc = multiprocessing .Process (target = run_unicode_server , kwargs = {"port" : unicode_server_port }, daemon = True )
159- proc .start ()
160-
161- # Wait for server to be ready
162- wait_for_server (unicode_server_port )
163-
164- try :
165- yield f"http://127.0.0.1:{ unicode_server_port } "
166- finally :
167- # Clean up - try graceful termination first
168- proc .terminate ()
169- proc .join (timeout = 2 )
170- if proc .is_alive (): # pragma: no cover
171- proc .kill ()
172- proc .join (timeout = 1 )
139+ async def handle_get_prompt (name : str , arguments : dict [str , str ] | None ) -> types .GetPromptResult :
140+ assert name == "unicode_prompt"
141+ return types .GetPromptResult (
142+ messages = [
143+ types .PromptMessage (
144+ role = "user" ,
145+ content = types .TextContent (type = "text" , text = "Hello世界🌍Привет안녕مرحباשלום" ),
146+ )
147+ ]
148+ )
149+
150+ return server
151+
152+
153+ @asynccontextmanager
154+ async def unicode_session () -> AsyncIterator [ClientSession ]:
155+ """Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the
156+ Unicode test server, entirely in process."""
157+ # SSE response mode, so Unicode rides the SSE event encoding rather than a plain JSON body.
158+ session_manager = StreamableHTTPSessionManager (app = make_unicode_server (), json_response = False )
159+ app = Starlette (routes = [Mount ("/mcp" , app = session_manager .handle_request )])
160+
161+ async with (
162+ session_manager .run (),
163+ # follow_redirects matches the SDK's own client factory; Starlette's Mount 307-redirects
164+ # the bare /mcp path to /mcp/.
165+ httpx .AsyncClient (
166+ transport = StreamingASGITransport (app ), base_url = BASE_URL , follow_redirects = True
167+ ) as http_client ,
168+ streamable_http_client (f"{ BASE_URL } /mcp" , http_client = http_client ) as (
169+ read_stream ,
170+ write_stream ,
171+ _get_session_id ,
172+ ),
173+ ClientSession (read_stream , write_stream ) as session ,
174+ ):
175+ await session .initialize ()
176+ yield session
173177
174178
175179@pytest .mark .anyio
176- async def test_streamable_http_client_unicode_tool_call (running_unicode_server : str ) -> None :
180+ async def test_streamable_http_client_unicode_tool_call () -> None :
177181 """Test that Unicode text is correctly handled in tool calls via streamable HTTP."""
178- base_url = running_unicode_server
179- endpoint_url = f"{ base_url } /mcp"
180-
181- async with streamable_http_client (endpoint_url ) as (read_stream , write_stream , _get_session_id ):
182- async with ClientSession (read_stream , write_stream ) as session :
183- await session .initialize ()
184-
185- # Test 1: List tools (server→client Unicode in descriptions)
186- tools = await session .list_tools ()
187- assert len (tools .tools ) == 1
182+ async with unicode_session () as session :
183+ # Test 1: List tools (server→client Unicode in descriptions)
184+ tools = await session .list_tools ()
185+ assert len (tools .tools ) == 1
188186
189- # Check Unicode in tool descriptions
190- echo_tool = tools .tools [0 ]
191- assert echo_tool .name == "echo_unicode"
192- assert echo_tool .description is not None
193- assert "🔤" in echo_tool .description
194- assert "👋" in echo_tool .description
187+ # Check Unicode in tool descriptions
188+ echo_tool = tools .tools [0 ]
189+ assert echo_tool .name == "echo_unicode"
190+ assert echo_tool .description is not None
191+ assert "🔤" in echo_tool .description
192+ assert "👋" in echo_tool .description
195193
196- # Test 2: Send Unicode text in tool call (client→server→client)
197- for test_name , test_string in UNICODE_TEST_STRINGS .items ():
198- result = await session .call_tool ("echo_unicode" , arguments = {"text" : test_string })
194+ # Test 2: Send Unicode text in tool call (client→server→client)
195+ for test_name , test_string in UNICODE_TEST_STRINGS .items ():
196+ result = await session .call_tool ("echo_unicode" , arguments = {"text" : test_string })
199197
200- # Verify server correctly received and echoed back Unicode
201- assert len (result .content ) == 1
202- content = result .content [0 ]
203- assert content .type == "text"
204- assert f"Echo: { test_string } " == content .text , f"Failed for { test_name } "
198+ # Verify server correctly received and echoed back Unicode
199+ assert len (result .content ) == 1
200+ content = result .content [0 ]
201+ assert content .type == "text"
202+ assert f"Echo: { test_string } " == content .text , f"Failed for { test_name } "
205203
206204
207205@pytest .mark .anyio
208- async def test_streamable_http_client_unicode_prompts (running_unicode_server : str ) -> None :
206+ async def test_streamable_http_client_unicode_prompts () -> None :
209207 """Test that Unicode text is correctly handled in prompts via streamable HTTP."""
210- base_url = running_unicode_server
211- endpoint_url = f"{ base_url } /mcp"
212-
213- async with streamable_http_client (endpoint_url ) as (read_stream , write_stream , _get_session_id ):
214- async with ClientSession (read_stream , write_stream ) as session :
215- await session .initialize ()
216-
217- # Test 1: List prompts (server→client Unicode in descriptions)
218- prompts = await session .list_prompts ()
219- assert len (prompts .prompts ) == 1
220-
221- prompt = prompts .prompts [0 ]
222- assert prompt .name == "unicode_prompt"
223- assert prompt .description is not None
224- assert "Слой хранилища, где располагаются" in prompt .description
225-
226- # Test 2: Get prompt with Unicode content (server→client)
227- result = await session .get_prompt ("unicode_prompt" , arguments = {})
228- assert len (result .messages ) == 1
229-
230- message = result .messages [0 ]
231- assert message .role == "user"
232- assert message .content .type == "text"
233- assert message .content .text == "Hello世界🌍Привет안녕مرحباשלום"
208+ async with unicode_session () as session :
209+ # Test 1: List prompts (server→client Unicode in descriptions)
210+ prompts = await session .list_prompts ()
211+ assert len (prompts .prompts ) == 1
212+
213+ prompt = prompts .prompts [0 ]
214+ assert prompt .name == "unicode_prompt"
215+ assert prompt .description is not None
216+ assert "Слой хранилища, где располагаются" in prompt .description
217+
218+ # Test 2: Get prompt with Unicode content (server→client)
219+ result = await session .get_prompt ("unicode_prompt" , arguments = {})
220+ assert len (result .messages ) == 1
221+
222+ message = result .messages [0 ]
223+ assert message .role == "user"
224+ assert message .content .type == "text"
225+ assert message .content .text == "Hello世界🌍Привет안녕مرحباשלום"
0 commit comments