Skip to content

Commit 83441b4

Browse files
committed
Align WebSocket session limits with client configuration
This commit ensures that user-defined limits (such as max frame size and message size) are correctly propagated from the client to the WebSocket session for both Reactor Netty and Jetty For Reactor Netty, the maxFramePayloadLength from WebsocketClientSpec is now passed to the ReactorNettyWebSocketSession. For Jetty, the JettyWebSocketClient now configures the native Jetty Session (including frame sizes, message sizes, and idle timeouts) during the session initialization within JettyWebSocketHandlerAdapter. Previously, both clients ignored certain custom configurations and fell back to default limits (e.g., 64KB for Netty), leading to TooLongFrameException or connection drops when receiving larger payloads from servers. Closes gh-36369 Signed-off-by: Artem Voronin <artem.voronin.dev@gmail.com>
1 parent 7299ff9 commit 83441b4

4 files changed

Lines changed: 90 additions & 5 deletions

File tree

spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.eclipse.jetty.client.Response;
2626
import org.eclipse.jetty.http.HttpHeader;
2727
import org.eclipse.jetty.util.component.LifeCycle;
28+
import org.eclipse.jetty.websocket.api.Session;
2829
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
2930
import org.eclipse.jetty.websocket.client.JettyUpgradeListener;
3031
import org.jspecify.annotations.Nullable;
@@ -107,8 +108,10 @@ public void onHandshakeResponse(Request request, Response response) {
107108
};
108109

109110
Sinks.Empty<Void> completion = Sinks.empty();
110-
JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(handler, session ->
111-
new JettyWebSocketSession(session, Objects.requireNonNull(handshakeInfo.get()), DefaultDataBufferFactory.sharedInstance, completion));
111+
JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(handler, session -> {
112+
configureSession(session);
113+
return new JettyWebSocketSession(session, Objects.requireNonNull(handshakeInfo.get()), DefaultDataBufferFactory.sharedInstance, completion);
114+
});
112115
try {
113116
this.client.connect(handlerAdapter, upgradeRequest, jettyUpgradeListener)
114117
.exceptionally(throwable -> {
@@ -123,4 +126,15 @@ public void onHandshakeResponse(Request request, Response response) {
123126
return Mono.error(ex);
124127
}
125128
}
129+
130+
private void configureSession(Session session) {
131+
session.setMaxFrameSize(this.client.getMaxFrameSize());
132+
session.setMaxBinaryMessageSize(this.client.getMaxBinaryMessageSize());
133+
session.setMaxTextMessageSize(this.client.getMaxTextMessageSize());
134+
session.setMaxOutgoingFrames(this.client.getMaxOutgoingFrames());
135+
session.setIdleTimeout(this.client.getIdleTimeout());
136+
session.setAutoFragment(this.client.isAutoFragment());
137+
session.setInputBufferSize(this.client.getInputBufferSize());
138+
session.setOutputBufferSize(this.client.getOutputBufferSize());
139+
}
126140
}

spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,18 @@ public Mono<Void> execute(URI url, WebSocketHandler handler) {
127127
@Override
128128
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
129129
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
130+
WebsocketClientSpec wsClientSpec = buildSpec(protocols);
130131
return getHttpClient()
131132
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
132-
.websocket(buildSpec(protocols))
133+
.websocket(wsClientSpec)
133134
.uri(url.toString())
134135
.handle((inbound, outbound) -> {
135136
HttpHeaders responseHeaders = toHttpHeaders(inbound);
136137
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
137138
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
138139
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
139-
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
140+
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory,
141+
wsClientSpec.maxFramePayloadLength());
140142
if (logger.isDebugEnabled()) {
141143
logger.debug("Started session '" + session.getId() + "' for " + url);
142144
}

spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ void stopServer() {
148148
if (this.client instanceof Lifecycle lifecycle) {
149149
lifecycle.stop();
150150
}
151-
this.server.stop();
151+
if (this.server != null) {
152+
this.server.stop();
153+
}
152154
}
153155

154156

spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package org.springframework.web.reactive.socket;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.time.Duration;
21+
import java.util.Arrays;
2022
import java.util.Collections;
2123
import java.util.HashMap;
2224
import java.util.List;
@@ -27,21 +29,28 @@
2729
import org.apache.commons.logging.LogFactory;
2830
import reactor.core.publisher.Flux;
2931
import reactor.core.publisher.Mono;
32+
import reactor.netty.http.client.WebsocketClientSpec;
3033
import reactor.util.retry.Retry;
3134

3235
import org.springframework.context.annotation.Bean;
3336
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.core.io.buffer.DataBuffer;
3438
import org.springframework.http.HttpHeaders;
3539
import org.springframework.http.ResponseCookie;
3640
import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter;
3741
import org.springframework.web.reactive.HandlerMapping;
3842
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
43+
import org.springframework.web.reactive.socket.adapter.NettyWebSocketSessionSupport;
44+
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
45+
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
46+
import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
3947
import org.springframework.web.reactive.socket.client.WebSocketClient;
4048
import org.springframework.web.server.WebFilter;
4149
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;
4250
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer;
4351

4452
import static org.assertj.core.api.Assertions.assertThat;
53+
import static org.assertj.core.api.Assertions.assertThatCode;
4554

4655
/**
4756
* Integration tests with server-side {@link WebSocketHandler}s.
@@ -186,6 +195,51 @@ void cookie(WebSocketClient client, HttpServer server, Class<?> serverConfigClas
186195
assertThat(cookie.get()).isEqualTo("project=spring");
187196
}
188197

198+
@ParameterizedWebSocketTest
199+
void largePayload(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
200+
201+
int defaultFrameMaxSize = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
202+
int extendedLimit = 2 * defaultFrameMaxSize;
203+
204+
WebSocketClient extendedClient = extendLimits(client, extendedLimit);
205+
206+
startServer(extendedClient, server, serverConfigClass);
207+
208+
AtomicReference<Integer> payloadSizeRef = new AtomicReference<>();
209+
assertThatCode(() -> extendedClient.execute(getUrl("/large-payload"),
210+
session -> session.receive()
211+
.map(WebSocketMessage::getPayload)
212+
.map(DataBuffer::readableByteCount)
213+
.reduce(Integer::sum)
214+
.doOnNext(payloadSizeRef::set)
215+
.then())
216+
.block(TIMEOUT))
217+
.doesNotThrowAnyException();
218+
219+
assertThat(payloadSizeRef.get()).isGreaterThan(defaultFrameMaxSize);
220+
assertThat(payloadSizeRef.get()).isEqualTo(extendedLimit);
221+
}
222+
223+
private WebSocketClient extendLimits(WebSocketClient client, int limit) {
224+
if (client instanceof ReactorNettyWebSocketClient netty) {
225+
client = new ReactorNettyWebSocketClient(
226+
netty.getHttpClient(),
227+
() -> WebsocketClientSpec.builder().maxFramePayloadLength(limit));
228+
}
229+
230+
if (client instanceof TomcatWebSocketClient tomcat) {
231+
tomcat.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(limit);
232+
}
233+
234+
if (client instanceof JettyWebSocketClient) {
235+
org.eclipse.jetty.websocket.client.WebSocketClient jetty =
236+
new org.eclipse.jetty.websocket.client.WebSocketClient();
237+
jetty.setMaxTextMessageSize(limit);
238+
client = new JettyWebSocketClient(jetty);
239+
}
240+
241+
return client;
242+
}
189243

190244
@Configuration
191245
static class WebConfig {
@@ -198,6 +252,7 @@ public HandlerMapping handlerMapping() {
198252
map.put("/custom-header", new CustomHeaderHandler());
199253
map.put("/close", new SessionClosingHandler());
200254
map.put("/cookie", new CookieHandler());
255+
map.put("/large-payload", new LargePayloadHandler());
201256
return new SimpleUrlHandlerMapping(map);
202257
}
203258

@@ -274,4 +329,16 @@ public Mono<Void> handle(WebSocketSession session) {
274329
}
275330
}
276331

332+
private static class LargePayloadHandler implements WebSocketHandler {
333+
334+
@Override
335+
public Mono<Void> handle(WebSocketSession session) {
336+
int doubledFrameSize = 2 * NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
337+
byte[] payload = new byte[doubledFrameSize];
338+
Arrays.fill(payload, (byte) 'x');
339+
String text = new String(payload, StandardCharsets.UTF_8);
340+
WebSocketMessage message = session.textMessage(text);
341+
return session.send(Mono.just(message));
342+
}
343+
}
277344
}

0 commit comments

Comments
 (0)