|
| 1 | +"""In-memory transport for testing MCP servers without network overhead.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from collections.abc import AsyncGenerator |
| 6 | +from contextlib import asynccontextmanager |
| 7 | +from typing import Any |
| 8 | + |
| 9 | +import anyio |
| 10 | +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
| 11 | + |
| 12 | +from mcp.server import Server |
| 13 | +from mcp.server.fastmcp import FastMCP |
| 14 | +from mcp.shared.memory import create_client_server_memory_streams |
| 15 | +from mcp.shared.message import SessionMessage |
| 16 | + |
| 17 | + |
| 18 | +class InMemoryTransport: |
| 19 | + """ |
| 20 | + In-memory transport for testing MCP servers without network overhead. |
| 21 | +
|
| 22 | + This transport starts the server in a background task and provides |
| 23 | + streams for client-side communication. The server is automatically |
| 24 | + stopped when the context manager exits. |
| 25 | +
|
| 26 | + Example: |
| 27 | + server = FastMCP("test") |
| 28 | + transport = InMemoryTransport(server) |
| 29 | +
|
| 30 | + async with transport.connect() as (read_stream, write_stream): |
| 31 | + async with ClientSession(read_stream, write_stream) as session: |
| 32 | + await session.initialize() |
| 33 | + # Use the session... |
| 34 | +
|
| 35 | + Or more commonly, use with Client: |
| 36 | + async with Client(server) as client: |
| 37 | + result = await client.call_tool("my_tool", {...}) |
| 38 | + """ |
| 39 | + |
| 40 | + def __init__( |
| 41 | + self, |
| 42 | + server: Server[Any] | FastMCP, |
| 43 | + *, |
| 44 | + raise_exceptions: bool = False, |
| 45 | + ) -> None: |
| 46 | + """ |
| 47 | + Initialize the in-memory transport. |
| 48 | +
|
| 49 | + Args: |
| 50 | + server: The MCP server to connect to (Server or FastMCP instance) |
| 51 | + raise_exceptions: Whether to raise exceptions from the server |
| 52 | + """ |
| 53 | + self._server = server |
| 54 | + self._raise_exceptions = raise_exceptions |
| 55 | + |
| 56 | + @asynccontextmanager |
| 57 | + async def connect( |
| 58 | + self, |
| 59 | + ) -> AsyncGenerator[ |
| 60 | + tuple[ |
| 61 | + MemoryObjectReceiveStream[SessionMessage | Exception], |
| 62 | + MemoryObjectSendStream[SessionMessage], |
| 63 | + ], |
| 64 | + None, |
| 65 | + ]: |
| 66 | + """ |
| 67 | + Connect to the server and return streams for communication. |
| 68 | +
|
| 69 | + Yields: |
| 70 | + A tuple of (read_stream, write_stream) for bidirectional communication |
| 71 | + """ |
| 72 | + # Unwrap FastMCP to get underlying Server |
| 73 | + actual_server: Server[Any] |
| 74 | + if isinstance(self._server, FastMCP): |
| 75 | + actual_server = self._server._mcp_server # type: ignore[reportPrivateUsage] |
| 76 | + else: |
| 77 | + actual_server = self._server |
| 78 | + |
| 79 | + async with create_client_server_memory_streams() as (client_streams, server_streams): |
| 80 | + client_read, client_write = client_streams |
| 81 | + server_read, server_write = server_streams |
| 82 | + |
| 83 | + async with anyio.create_task_group() as tg: |
| 84 | + # Start server in background |
| 85 | + tg.start_soon( |
| 86 | + lambda: actual_server.run( |
| 87 | + server_read, |
| 88 | + server_write, |
| 89 | + actual_server.create_initialization_options(), |
| 90 | + raise_exceptions=self._raise_exceptions, |
| 91 | + ) |
| 92 | + ) |
| 93 | + |
| 94 | + try: |
| 95 | + yield client_read, client_write |
| 96 | + finally: |
| 97 | + tg.cancel_scope.cancel() |
0 commit comments