Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,37 @@ app = Starlette(routes=[Mount("/", app=mcp.streamable_http_app(json_response=Tru

**Note:** DNS rebinding protection is automatically enabled when `host` is `127.0.0.1`, `localhost`, or `::1`. This now happens in `sse_app()` and `streamable_http_app()` instead of the constructor.

### `MCPServer.get_context()` removed

`MCPServer.get_context()` has been removed. Context is now injected by the framework and passed explicitly — there is no ambient ContextVar to read from.

**If you were calling `get_context()` from inside a tool/resource/prompt:** use the `ctx: Context` parameter injection instead.

**Before (v1):**

```python
@mcp.tool()
async def my_tool(x: int) -> str:
ctx = mcp.get_context()
await ctx.info("Processing...")
return str(x)
```

**After (v2):**

```python
@mcp.tool()
async def my_tool(x: int, ctx: Context) -> str:
await ctx.info("Processing...")
return str(x)
```

### `MCPServer.call_tool()`, `read_resource()`, `get_prompt()` now accept a `context` parameter

`MCPServer.call_tool()`, `MCPServer.read_resource()`, and `MCPServer.get_prompt()` now accept an optional `context: Context | None = None` parameter. The framework passes this automatically during normal request handling — you only need to supply it when calling these methods directly.

If the tool, resource template, or prompt you're invoking declares a `ctx: Context` parameter, you must pass a `Context`. Calling without one raises `ToolError` for tools or `ValueError` for prompts and resource templates.

### Replace `RootModel` by union types with `TypeAdapter` validation

The following union types are no longer `RootModel` subclasses:
Expand Down Expand Up @@ -694,7 +725,7 @@ If you prefer the convenience of automatic wrapping, use `MCPServer` which still

### Lowlevel `Server`: `request_context` property removed

The `server.request_context` property has been removed. Request context is now passed directly to handlers as the first argument (`ctx`). The `request_ctx` module-level contextvar is now an internal implementation detail and should not be relied upon.
The `server.request_context` property has been removed. Request context is now passed directly to handlers as the first argument (`ctx`). The `request_ctx` module-level contextvar has been removed entirely.

**Before (v1):**

Expand Down
9 changes: 1 addition & 8 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async def main():

from __future__ import annotations

import contextvars
import logging
import warnings
from collections.abc import AsyncIterator, Awaitable, Callable
Expand Down Expand Up @@ -74,8 +73,6 @@ async def main():

LifespanResultT = TypeVar("LifespanResultT", default=Any)

request_ctx: contextvars.ContextVar[ServerRequestContext[Any]] = contextvars.ContextVar("request_ctx")


class NotificationOptions:
def __init__(self, prompts_changed: bool = False, resources_changed: bool = False, tools_changed: bool = False):
Expand Down Expand Up @@ -474,11 +471,7 @@ async def _handle_request(
close_sse_stream=close_sse_stream_cb,
close_standalone_sse_stream=close_standalone_sse_stream_cb,
)
token = request_ctx.set(ctx)
try:
response = await handler(ctx, req.params)
finally:
request_ctx.reset(token)
response = await handler(ctx, req.params)
except MCPError as err:
response = err.error
except anyio.get_cancelled_exc_class():
Expand Down
3 changes: 2 additions & 1 deletion src/mcp/server/mcpserver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from mcp.types import Icon

from .server import Context, MCPServer
from .context import Context
from .server import MCPServer
from .utilities.types import Audio, Image

__all__ = ["MCPServer", "Context", "Image", "Audio", "Icon"]
281 changes: 281 additions & 0 deletions src/mcp/server/mcpserver/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
from __future__ import annotations

from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Generic, Literal

from pydantic import AnyUrl, BaseModel

from mcp.server.context import LifespanContextT, RequestT, ServerRequestContext
from mcp.server.elicitation import (
ElicitationResult,
ElicitSchemaModelT,
UrlElicitationResult,
elicit_with_validation,
)
from mcp.server.elicitation import (
elicit_url as _elicit_url,
)
Comment on lines +15 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Suggested change
from mcp.server.elicitation import (
elicit_url as _elicit_url,
)
from mcp.server.elicitation import elicit_url

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it.

Suggested change
from mcp.server.elicitation import (
elicit_url as _elicit_url,
)
from mcp.server.elicitation import elicit_url as _elicit_url

Still...

from mcp.server.lowlevel.helper_types import ReadResourceContents

if TYPE_CHECKING:
from mcp.server.mcpserver.server import MCPServer


class Context(BaseModel, Generic[LifespanContextT, RequestT]):
"""Context object providing access to MCP capabilities.

This provides a cleaner interface to MCP's RequestContext functionality.
It gets injected into tool and resource functions that request it via type hints.

To use context in a tool function, add a parameter with the Context type annotation:

```python
@server.tool()
async def my_tool(x: int, ctx: Context) -> str:
# Log messages to the client
await ctx.info(f"Processing {x}")
await ctx.debug("Debug info")
await ctx.warning("Warning message")
await ctx.error("Error message")

# Report progress
await ctx.report_progress(50, 100)

# Access resources
data = await ctx.read_resource("resource://data")

# Get request info
request_id = ctx.request_id
client_id = ctx.client_id

return str(x)
```

The context parameter name can be anything as long as it's annotated with Context.
The context is optional - tools that don't need it can omit the parameter.
"""

_request_context: ServerRequestContext[LifespanContextT, RequestT] | None
_mcp_server: MCPServer | None

def __init__(
self,
*,
request_context: ServerRequestContext[LifespanContextT, RequestT] | None = None,
mcp_server: MCPServer | None = None,
# TODO(Marcelo): We should drop this kwargs parameter.
**kwargs: Any,
Comment on lines +66 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should follow what Marcelo said.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I wasn't actually changing the Context class at all, just moving it for now

):
super().__init__(**kwargs)
self._request_context = request_context
self._mcp_server = mcp_server

@property
def mcp_server(self) -> MCPServer:
"""Access to the MCPServer instance."""
if self._mcp_server is None: # pragma: no cover
raise ValueError("Context is not available outside of a request")
return self._mcp_server # pragma: no cover

@property
def request_context(self) -> ServerRequestContext[LifespanContextT, RequestT]:
"""Access to the underlying request context."""
if self._request_context is None: # pragma: no cover
raise ValueError("Context is not available outside of a request")
return self._request_context

async def report_progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
"""Report progress for the current operation.

Args:
progress: Current progress value (e.g., 24)
total: Optional total value (e.g., 100)
message: Optional message (e.g., "Starting render...")
"""
progress_token = self.request_context.meta.get("progress_token") if self.request_context.meta else None

if progress_token is None: # pragma: no cover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if progress_token is None: # pragma: no cover
if progress_token is None: # pragma: no branch

return

await self.request_context.session.send_progress_notification(
progress_token=progress_token,
progress=progress,
total=total,
message=message,
related_request_id=self.request_id,
)

async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this AnyUrl?

Suggested change
async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
async def read_resource(self, uri: str) -> Iterable[ReadResourceContents]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keen to follow up with a different PR to actually refactor the Context class itself, but this one is just about moving it out and removing the context_var access.

"""Read a resource by URI.

Args:
uri: Resource URI to read

Returns:
The resource content as either text or bytes
"""
assert self._mcp_server is not None, "Context is not available outside of a request"
return await self._mcp_server.read_resource(uri, self)

async def elicit(
self,
message: str,
schema: type[ElicitSchemaModelT],
) -> ElicitationResult[ElicitSchemaModelT]:
"""Elicit information from the client/user.

This method can be used to interactively ask for additional information from the
client within a tool's execution. The client might display the message to the
user and collect a response according to the provided schema. If the client
is an agent, it might decide how to handle the elicitation -- either by asking
the user or automatically generating a response.

Args:
message: Message to present to the user
schema: A Pydantic model class defining the expected response structure.
According to the specification, only primitive types are allowed.

Returns:
An ElicitationResult containing the action taken and the data if accepted

Note:
Check the result.action to determine if the user accepted, declined, or cancelled.
The result.data will only be populated if action is "accept" and validation succeeded.
"""

return await elicit_with_validation(
session=self.request_context.session,
message=message,
schema=schema,
related_request_id=self.request_id,
)

async def elicit_url(
self,
message: str,
url: str,
elicitation_id: str,
) -> UrlElicitationResult:
"""Request URL mode elicitation from the client.

This directs the user to an external URL for out-of-band interactions
that must not pass through the MCP client. Use this for:
- Collecting sensitive credentials (API keys, passwords)
- OAuth authorization flows with third-party services
- Payment and subscription flows
- Any interaction where data should not pass through the LLM context

The response indicates whether the user consented to navigate to the URL.
The actual interaction happens out-of-band. When the elicitation completes,
call `ctx.session.send_elicit_complete(elicitation_id)` to notify the client.

Args:
message: Human-readable explanation of why the interaction is needed
url: The URL the user should navigate to
elicitation_id: Unique identifier for tracking this elicitation

Returns:
UrlElicitationResult indicating accept, decline, or cancel
"""
return await _elicit_url(
session=self.request_context.session,
message=message,
url=url,
elicitation_id=elicitation_id,
related_request_id=self.request_id,
)

async def log(
self,
level: Literal["debug", "info", "warning", "error"],
message: str,
*,
logger_name: str | None = None,
extra: dict[str, Any] | None = None,
) -> None:
"""Send a log message to the client.

Args:
level: Log level (debug, info, warning, error)
message: Log message
logger_name: Optional logger name
extra: Optional dictionary with additional structured data to include
"""

if extra:
log_data = {"message": message, **extra}
else:
log_data = message

await self.request_context.session.send_log_message(
level=level,
data=log_data,
logger=logger_name,
related_request_id=self.request_id,
)

@property
def client_id(self) -> str | None:
"""Get the client ID if available."""
return self.request_context.meta.get("client_id") if self.request_context.meta else None # pragma: no cover

@property
def request_id(self) -> str:
"""Get the unique ID for this request."""
return str(self.request_context.request_id)

@property
def session(self):
"""Access to the underlying session for advanced usage."""
return self.request_context.session

async def close_sse_stream(self) -> None:
"""Close the SSE stream to trigger client reconnection.

This method closes the HTTP connection for the current request, triggering
client reconnection. Events continue to be stored in the event store and will
be replayed when the client reconnects with Last-Event-ID.

Use this to implement polling behavior during long-running operations -
the client will reconnect after the retry interval specified in the priming event.

Note:
This is a no-op if not using StreamableHTTP transport with event_store.
The callback is only available when event_store is configured.
"""
if self._request_context and self._request_context.close_sse_stream: # pragma: no cover
await self._request_context.close_sse_stream()

async def close_standalone_sse_stream(self) -> None:
"""Close the standalone GET SSE stream to trigger client reconnection.

This method closes the HTTP connection for the standalone GET stream used
for unsolicited server-to-client notifications. The client SHOULD reconnect
with Last-Event-ID to resume receiving notifications.

Note:
This is a no-op if not using StreamableHTTP transport with event_store.
Currently, client reconnection for standalone GET streams is NOT
implemented - this is a known gap.
"""
if self._request_context and self._request_context.close_standalone_sse_stream: # pragma: no cover
await self._request_context.close_standalone_sse_stream()
Comment on lines +232 to +262
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we find out the answers for what I asked before? When would the developer actually use those???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc I couldn't find actual uses for it publically, but it's a required part of the spec the sdk needs to support

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem a reasonable answer. So we have a public API that is required by a spec that actually no one uses? Is the spec written for the sake of having a spec? Can we please find the use case of it?


# Convenience methods for common log levels
async def debug(self, message: str, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send a debug log message."""
await self.log("debug", message, logger_name=logger_name, extra=extra)

async def info(self, message: str, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send an info log message."""
await self.log("info", message, logger_name=logger_name, extra=extra)

async def warning(
self, message: str, *, logger_name: str | None = None, extra: dict[str, Any] | None = None
) -> None:
"""Send a warning log message."""
await self.log("warning", message, logger_name=logger_name, extra=extra)

async def error(self, message: str, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send an error log message."""
await self.log("error", message, logger_name=logger_name, extra=extra)
11 changes: 9 additions & 2 deletions src/mcp/server/mcpserver/prompts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

if TYPE_CHECKING:
from mcp.server.context import LifespanContextT, RequestT
from mcp.server.mcpserver.server import Context
from mcp.server.mcpserver.context import Context


class Message(BaseModel):
Expand Down Expand Up @@ -138,7 +138,14 @@ async def render(
arguments: dict[str, Any] | None = None,
context: Context[LifespanContextT, RequestT] | None = None,
) -> list[Message]:
"""Render the prompt with arguments."""
"""Render the prompt with arguments.

Raises:
ValueError: If the prompt requires a Context but none was provided,
if required arguments are missing, or if rendering fails.
"""
if self.context_kwarg is not None and context is None:
raise ValueError(f"Prompt {self.name!r} requires a Context, but none was provided")
Comment on lines +147 to +148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the Context be None here? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, probably it's RuntimeError, if anything.

Copy link
Contributor Author

@maxisbey maxisbey Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly the way Context is optional is rather weird through the whole call chain. If it's None it's sometimes just not injected which also breaks stuff.

After taking a bit of time to walk through it, I think it'd be better to leave it as optional on the MCPServer methods itself, but then make it required on the rest of the call stack, which is essentially the same functionality that existed before. If you called the previous MCPServer.get_context method it would just construct a new one if none existed.

So will do that to restore what was here before and remove some of the weird optional handling through the rest of the call chain.

# Validate required arguments
if self.arguments:
required = {arg.name for arg in self.arguments if arg.required}
Expand Down
Loading