Skip to content

Conversation

@MackinnonBuck
Copy link
Collaborator

Opening initially as a draft while adding/debugging remaining E2E tests

Fixes #510
Fixes #1020

/// For other transports, this property will be <see langword="null"/>.
/// </para>
/// </remarks>
public Action? CloseStandaloneSseStream { get; set; }
Copy link
Contributor

@halter73 halter73 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes to the JsonRpcMessageContext necessary? I think I see the purpose. You cannot always rely on the request ID from the POST body to identify the final message in the SSE stream, but I wonder if there isn't a cleaner abstraction. Maybe disposing the RelatedTransport? That wouldn't cover the GET stream, but I don't know why you need to be able to close the GET stream via a random JsonRpcMessage at all.

public void CloseStandaloneSseStream()
{
JsonRpcRequest.Context?.CloseStandaloneSseStream?.Invoke();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can probably be reverted for the same reasons as the JsonRpcMessageContext changes

}

// If this is a POST stream, we're done - the replay was the complete response
if (streamId != GetStreamId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be similar changes to HandlePostRequestAsync? Also, we need to do more to avoid stream ID collisions. Even though there's one "get" per session, IEventStore is supplied via HttpServerTransportOptions.EventStore which makes it quite difficult to configure a per-session store.

I don't think people want to be forced to configure the store per-session anyway. I think we need to include the session ID or maybe a random GUID for "Stateless" requests if we support stateless resumption at all.

I think it could make sense to support stateless, but we need to have tests for stateless and non-stateless tests. I recommend looking at the MapMcpTests and MapMcpStreamableHttpTests. By adding test cases to MapMcpStreamableHttpTests, you should get both stateless and non-stateless test coverage.

/// Priming events are only supported in protocol version &gt;= 2025-11-25.
/// Older clients may crash when receiving SSE events with empty data.
/// </remarks>
internal static bool SupportsResumability(string? protocolVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This should be renamed. It's not about whether the protocol supports resumability, but it's specifically about the "priming" event.

/// In-memory event store for testing resumability.
/// This is a simple implementation intended for testing, not for production use.
/// </summary>
public class InMemoryEventStore : IEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about the idea of including a default implementation of IEventStore based on IDistributedCache? It'd be opt-in ofc. I could definitely see it being a follow up. Figuring out how to get the usability of that right might influence the design of the abstraction.

/// Gets or sets the retry interval to suggest to clients in SSE retry field.
/// </summary>
/// <value>
/// The retry interval. The default is <see langword="null"/>, meaning no retry field is sent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec states "it SHOULD send an SSE event with a standard retry field". Does that mean our default is going against the spec's recommendation?

response = await _httpClient.SendAsync(request, message: null, _connectionCts.Token).ConfigureAwait(false);
}
catch (HttpRequestException)
// Continuously receive unsolicited messages until cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: canceled

else
{
// We have an event ID, so reconnection should work - reset attempts
attempt = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents us from ending up in an infinite loop?

/// Implementations should be thread-safe, as events may be stored and replayed concurrently.
/// </para>
/// </remarks>
public interface IEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fairly general name. Could we make it ISseEventStore or something similarly more specific?

Comment on lines 25 to 34
/// Stores an event for later retrieval.
/// </summary>
/// <param name="streamId">
/// The ID of the stream the event belongs to. This is typically the JSON-RPC request ID
/// for POST SSE responses, or a special identifier for the standalone GET SSE stream.
/// </param>
/// <param name="message">
/// The JSON-RPC message to store, or <see langword="null"/> for priming events.
/// Priming events establish the event ID without carrying a message payload.
/// </param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the definition of "event" here? It says this is for storing an event, but then accepts the JSON-RPC message to store. Is the intimation that the event contains that message?

Comment on lines 52 to 55
ValueTask<string?> ReplayEventsAfterAsync(
string lastEventId,
Func<JsonRpcMessage, string, CancellationToken, ValueTask> sendCallback,
CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you chose to expose it like this rather than, say, exposing a GetEventsAfter method that returned an enumerable.

await foreach (var item in messages.WithCancellation(cancellationToken).ConfigureAwait(false))
{
// Skip endpoint events, priming events, and replayed events (which already have IDs)
if (item.EventType == "endpoint" || item.EventType == "priming" || item.EventId is not null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if (item.EventType == "endpoint" || item.EventType == "priming" || item.EventId is not null)
if (item.EventType is "endpoint" or "priming" || item.EventId is not null)

Copy link
Contributor

@mikekistler mikekistler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get through all of this but want to share the comments I've made so far.

/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sessionId could be null, should it be declared as string??

/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow the EventStore to selectively store events. For example, we should allow the EventStore to choose to not store Progress notifications. In these cases, it isn't necessary to assign an id to the event -- I'm pretty sure the SSE spec allows this. So could we make the return value a string? to permit the EventStore to choose to not assign an id to the event (which it may not have stored).

/// Priming events establish the event ID without carrying a message payload.
/// </param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>The generated event ID for the stored event.</returns>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some requirements on the event ID, like it must be unique within the session, if the server uses session management. It might be good to add these requirements in the doc here.

/// <summary>
/// Gets the session ID that the events belong to.
/// </summary>
public required string SessionId { get; init; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, should SessionId be a string??

/// This class simplifies event storage by binding the session ID, stream ID, and retry interval
/// so that callers only need to provide the message when storing events.
/// </remarks>
internal sealed class SseStreamEventStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be really helpful to expose a base class for the event store that users could easily extend with custom behavior to suit their needs, such as an event filter, id generator, and retryInterval delegate.


// Create a priming event: empty data with an event ID
// We use a special "priming" event type that the formatter will handle
var primingItem = new SseItem<JsonRpcMessage?>(null, "priming")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was decided in Discord that the event name should be "prime".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SEP-1699: Support SSE polling via server-side disconnect Add support for resumability and redelivery to Streamable HTTP transport

5 participants