Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/mcp/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,13 @@ def server_info
end

def init(params, session: nil)
# MCP spec: the initialization phase MUST be the first interaction between client and server.
# Reject duplicate `initialize` on an already-initialized session so the negotiated
# client identity and capabilities cannot be silently overwritten.
if session&.initialized?
raise RequestHandlerError.new("Invalid Request: Server already initialized", params, error_type: :invalid_request)
end

if params
if session
session.store_client_info(client: params[:clientInfo], capabilities: params[:capabilities])
Expand Down Expand Up @@ -524,6 +531,8 @@ def init(params, session: nil)
response_instructions = nil
end

session&.mark_initialized!

{
protocolVersion: negotiated_version,
capabilities: capabilities,
Expand Down
79 changes: 68 additions & 11 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ def handle_post(request)
return invalid_json_response
end

# Streamable HTTP (2025-11-25) requires a single JSON-RPC message object per POST.
# Batched/array bodies are not supported; reject with `-32600` instead of falling through to
# a malformed Rack response.
unless body.is_a?(Hash)
return invalid_request_response("Invalid Request: JSON-RPC body must be a single request object")
end

# The `MCP-Protocol-Version` header is only meaningful after negotiation, so on `initialize`
# the JSON-RPC body `params.protocolVersion` is authoritative and the header (if any) is ignored.
# This matches the TypeScript and Python SDKs.
Expand All @@ -357,10 +364,18 @@ def handle_post(request)
return protocol_version_error if protocol_version_error
end

# MCP 2025-11-25 does not support JSON-RPC batch, so the body must be a single message object.
return non_hash_body_response unless body.is_a?(Hash)

if initialize_request?(body)
if !@stateless && session_id
# An `initialize` request carrying an `Mcp-Session-Id` header is either a duplicate
# initialization attempt against a live session, or a retry against an unknown/expired
# one. In the live case, reject with `-32600` so the original session is not abandoned.
# In the unknown/expired case, return 404 so the client retries from scratch instead
# of silently inheriting a fresh session under the old ID.
return already_initialized_response(body[:id]) if session_active?(session_id)

return session_not_found_response
end

handle_initialization(body_string, body)
elsif notification?(body)
dispatch_notification(body_string, session_id)
Expand Down Expand Up @@ -523,14 +538,6 @@ def invalid_json_response
[400, { "Content-Type" => "application/json" }, [{ error: "Invalid JSON" }.to_json]]
end

def non_hash_body_response
[
400,
{ "Content-Type" => "application/json" },
[{ error: "Bad Request: request body must be a single JSON-RPC message object" }.to_json],
]
end

def initialize_request?(body)
body.is_a?(Hash) && body[:method] == Methods::INITIALIZE
end
Expand Down Expand Up @@ -617,6 +624,15 @@ def handle_initialization(body_string, body)
@server.handle_json(body_string)
end

# If `Server#init` produced an error response (e.g., malformed JSON-RPC envelope),
# `mark_initialized!` was never called. Discard the orphaned session and omit
# the `Mcp-Session-Id` header so the client retries from a clean state instead of
# reusing a never-initialized ID that would later look like a duplicate `initialize`.
if server_session && !server_session.initialized?
cleanup_session(session_id)
session_id = nil
end

headers = {
"Content-Type" => "application/json",
}
Expand Down Expand Up @@ -751,6 +767,31 @@ def session_exists?(session_id)
@mutex.synchronize { @sessions.key?(session_id) }
end

# Returns true iff a session exists and is not past its idle timeout. Expired sessions
# are evicted as a side effect so a live request never observes a zombie session that
# the reaper hasn't yet pruned. Does NOT update `last_active_at`; callers that are
# rejecting a request must not extend the session's lifetime.
def session_active?(session_id)
removed = nil
active = @mutex.synchronize do
next false unless (session = @sessions[session_id])

if session_expired?(session)
removed = cleanup_session_unsafe(session_id)
next false
end

true
end

if removed
close_stream_safely(removed[:get_sse_stream])
close_post_request_streams(removed)
end

active
end

def method_not_allowed_response
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
end
Expand All @@ -763,6 +804,22 @@ def session_not_found_response
[404, { "Content-Type" => "application/json" }, [{ error: "Session not found" }.to_json]]
end

def already_initialized_response(request_id)
invalid_request_response("Invalid Request: Server already initialized", request_id: request_id)
end

def invalid_request_response(message, request_id: nil)
body = {
jsonrpc: "2.0",
id: request_id,
error: {
code: JsonRpcHandler::ErrorCode::INVALID_REQUEST,
message: message,
},
}
[400, { "Content-Type" => "application/json" }, [body.to_json]]
end

def session_already_connected_response
[
409,
Expand Down
13 changes: 13 additions & 0 deletions lib/mcp/server_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ def initialize(server:, transport:, session_id: nil)
@logging_message_notification = nil
@in_flight = {}
@in_flight_mutex = Mutex.new
@initialized = false
end

# Whether `initialize` has already completed for this session.
def initialized?
@initialized
end

# Called by `Server#init` after a successful `initialize` response, so subsequent
# `initialize` requests on the same session can be rejected per MCP spec
# (the initialization phase MUST be the first interaction).
def mark_initialized!
@initialized = true
end

# Registers a `Cancellation` token for an in-flight request.
Expand Down
49 changes: 49 additions & 0 deletions test/mcp/server/transports/stdio_transport_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,55 @@ class StdioTransportTest < ActiveSupport::TestCase
end
end

test "rejects duplicate initialize on the same stdio session with -32600" do
first = {
jsonrpc: "2.0",
method: "initialize",
id: "first",
params: {
protocolVersion: "2025-11-25",
clientInfo: { name: "original", version: "1.0" },
},
}
second = {
jsonrpc: "2.0",
method: "initialize",
id: "second",
params: {
protocolVersion: "2024-11-05",
clientInfo: { name: "intruder", version: "9.9" },
},
}
input = StringIO.new("#{JSON.generate(first)}\n#{JSON.generate(second)}\n")
output = StringIO.new
original_stdin = $stdin
original_stdout = $stdout

begin
$stdin = input
$stdout = output
@transport.open

lines = output.string.lines
assert_equal(2, lines.length)
first_response = JSON.parse(lines[0], symbolize_names: true)
second_response = JSON.parse(lines[1], symbolize_names: true)

assert_equal("first", first_response[:id])
refute_nil(first_response[:result])

assert_equal("second", second_response[:id])
assert_equal(-32600, second_response[:error][:code])
assert_equal("Invalid Request", second_response[:error][:message])

session = @transport.instance_variable_get(:@session)
assert_equal({ name: "original", version: "1.0" }, session.client)
ensure
$stdin = original_stdin
$stdout = original_stdout
end
end

test "handles invalid JSON requests" do
invalid_json = "invalid json"
output = StringIO.new
Expand Down
127 changes: 125 additions & 2 deletions test/mcp/server/transports/streamable_http_transport_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def string
assert_equal 400, response[0]

body = JSON.parse(response[2][0])
assert_includes body["error"], "single JSON-RPC message object"
assert_equal JsonRpcHandler::ErrorCode::INVALID_REQUEST, body["error"]["code"]
assert_match(/single request object/i, body["error"]["message"])
end

test "POST request with non-object JSON body returns 400" do
Expand All @@ -147,7 +148,8 @@ def string
assert_equal 400, response[0]

body = JSON.parse(response[2][0])
assert_includes body["error"], "single JSON-RPC message object"
assert_equal JsonRpcHandler::ErrorCode::INVALID_REQUEST, body["error"]["code"]
assert_match(/single request object/i, body["error"]["message"])
end

test "handles POST request with initialize method" do
Expand All @@ -169,6 +171,127 @@ def string
assert_equal Configuration::LATEST_STABLE_PROTOCOL_VERSION, body["result"]["protocolVersion"]
end

test "rejects duplicate initialize with existing Mcp-Session-Id and preserves session" do
init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "first" }.to_json,
)
init_response = @transport.handle_request(init_request)
session_id = init_response[1]["Mcp-Session-Id"]
assert session_id

duplicate_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => session_id },
{ jsonrpc: "2.0", method: "initialize", id: "second" }.to_json,
)
duplicate_response = @transport.handle_request(duplicate_request)

assert_equal 400, duplicate_response[0]
body = JSON.parse(duplicate_response[2][0])
assert_equal "2.0", body["jsonrpc"]
assert_equal "second", body["id"]
assert_equal JsonRpcHandler::ErrorCode::INVALID_REQUEST, body["error"]["code"]
assert_match(/already initialized/i, body["error"]["message"])

# Original session should still be usable.
ping_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => session_id },
{ jsonrpc: "2.0", method: "ping", id: "ping-1" }.to_json,
)
ping_response = @transport.handle_request(ping_request)
assert_equal 200, ping_response[0]
end

test "rejects initialize with stale Mcp-Session-Id with 404" do
request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => "unknown-session" },
{ jsonrpc: "2.0", method: "initialize", id: "1" }.to_json,
)

response = @transport.handle_request(request)
assert_equal 404, response[0]
body = JSON.parse(response[2][0])
assert_equal "Session not found", body["error"]
end

test "rejects duplicate initialize against an idle-expired session with 404 and evicts it" do
transport = StreamableHTTPTransport.new(@server, session_idle_timeout: 0.05)
begin
init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "first" }.to_json,
)
init_response = transport.handle_request(init_request)
session_id = init_response[1]["Mcp-Session-Id"]
assert(session_id)

sleep(0.1)

duplicate_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => session_id },
{ jsonrpc: "2.0", method: "initialize", id: "second" }.to_json,
)
duplicate_response = transport.handle_request(duplicate_request)

assert_equal(404, duplicate_response[0])
body = JSON.parse(duplicate_response[2][0])
assert_equal("Session not found", body["error"])

refute(transport.send(:session_exists?, session_id), "expired session must be evicted")
ensure
transport.close
end
end

test "evicts session and omits Mcp-Session-Id when initialize fails" do
# An `initialize` whose JSON-RPC envelope is rejected (e.g. wrong `jsonrpc` version)
# never reaches `Server#init`, so `mark_initialized!` is never called. The transport
# must drop the registered-but-uninitialized session to keep retries clean.
request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "1.0", method: "initialize", id: "broken" }.to_json,
)

response = @transport.handle_request(request)
assert_equal 200, response[0]
refute response[1].key?("Mcp-Session-Id"), "no session id should leak from a failed init"

body = JSON.parse(response[2][0])
assert_equal JsonRpcHandler::ErrorCode::INVALID_REQUEST, body["error"]["code"]
assert_equal({}, @transport.instance_variable_get(:@sessions))
end

test "rejects non-Hash JSON-RPC body with HTTP 400 and -32600" do
request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
[{ jsonrpc: "2.0", method: "initialize", id: "batched" }].to_json,
)

response = @transport.handle_request(request)
assert_equal 400, response[0]
body = JSON.parse(response[2][0])
assert_equal "2.0", body["jsonrpc"]
assert_nil body["id"]
assert_equal JsonRpcHandler::ErrorCode::INVALID_REQUEST, body["error"]["code"]
assert_match(/single request object/i, body["error"]["message"])
end

test "handles GET request with valid session ID" do
# First create a session with initialize
init_request = create_rack_request(
Expand Down
Loading