Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,23 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,

requestSender = setUri(requestSender, uri);
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Connection> connectionRef = new AtomicReference<>();

return requestSender
return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
registerAttributeCallback(connection);
connectionRef.set(connection);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
.doFinally(signal -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null) {
clearChannelAttribute(response.connection);
}
})
.doOnCancel(() -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null) {
Expand All @@ -180,6 +187,10 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
});
}

private static void clearChannelAttribute(Connection connection) {
connection.channel().attr(ATTRIBUTES_KEY).set(null);
}

private static HttpClient.RequestSender setUri(HttpClient.RequestSender requestSender, URI uri) {
if (uri.isAbsolute()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,6 @@ void retrieveTextDecodedToFlux(ClientHttpConnector connector) throws IOException
.verify(Duration.ofSeconds(3));
}

@Disabled("Disabled because it's flaky (gh-36589)")
@Test // gh-36158
void reactorNettyAttributes() throws IOException {
startServer(new ReactorClientHttpConnector());
Expand All @@ -1334,14 +1333,17 @@ void reactorNettyAttributes() throws IOException {
AtomicReference<Channel> channelRef = new AtomicReference<>();

Mono<String> result = this.webClient.get().uri("/greeting")
.httpRequest(request -> {
HttpClientRequest reactorRequest = request.getNativeRequest();
channelRef.set(((ChannelOperations<?, ?>) reactorRequest).channel());
})
.retrieve()
.bodyToMono(String.class);

StepVerifier.create(result).expectNext("Hello Spring!").expectComplete().verify(Duration.ofSeconds(3));
.httpRequest(request -> {
HttpClientRequest reactorRequest = request.getNativeRequest();
channelRef.set(((ChannelOperations<?, ?>) reactorRequest).channel());
})
.retrieve()
.bodyToMono(String.class);

StepVerifier.create(result)
.expectNext("Hello Spring!")
.expectComplete()
.verify(Duration.ofSeconds(3));

assertThat(channelRef.get().attr(ReactorClientHttpConnector.ATTRIBUTES_KEY).get()).isNull();
}
Expand Down