Skip to content

Commit cd69a4a

Browse files
committed
Support DestinationVariable on RSocket handlers
Prior to this commit, the pattern destination variables were not set in the message headers prior to calling the handler. In this case, the `DestinationVariableMethodArgumentResolver` could not get the destination variables from the message headers and resolve those as handler arguments. This commit mutates the message headers if the message destination contains patterns. Fixes gh-22776
1 parent a0826a2 commit cd69a4a

File tree

4 files changed

+47
-4
lines changed

4 files changed

+47
-4
lines changed

spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import java.util.Comparator;
2525
import java.util.LinkedHashSet;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.Set;
2829
import java.util.function.Predicate;
2930

31+
import reactor.core.publisher.Mono;
32+
3033
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
3134
import org.springframework.context.ApplicationContext;
3235
import org.springframework.context.ConfigurableApplicationContext;
@@ -39,16 +42,19 @@
3942
import org.springframework.messaging.Message;
4043
import org.springframework.messaging.handler.CompositeMessageCondition;
4144
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
45+
import org.springframework.messaging.handler.HandlerMethod;
4246
import org.springframework.messaging.handler.annotation.MessageMapping;
4347
import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver;
4448
import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
4549
import org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler;
4650
import org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler;
4751
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
4852
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
53+
import org.springframework.messaging.support.MessageHeaderAccessor;
4954
import org.springframework.stereotype.Controller;
5055
import org.springframework.util.AntPathMatcher;
5156
import org.springframework.util.Assert;
57+
import org.springframework.util.CollectionUtils;
5258
import org.springframework.util.PathMatcher;
5359
import org.springframework.util.StringValueResolver;
5460
import org.springframework.validation.Validator;
@@ -311,4 +317,19 @@ protected AbstractExceptionHandlerMethodResolver createExceptionMethodResolverFo
311317
return new AnnotationExceptionHandlerMethodResolver(beanType);
312318
}
313319

320+
@Override
321+
protected Mono<Void> handleMatch(CompositeMessageCondition mapping, HandlerMethod handlerMethod, Message<?> message) {
322+
Set<String> patterns = mapping.getCondition(DestinationPatternsMessageCondition.class).getPatterns();
323+
if (!CollectionUtils.isEmpty(patterns)) {
324+
String pattern = patterns.iterator().next();
325+
String destination = getDestination(message);
326+
Map<String, String> vars = getPathMatcher().extractUriTemplateVariables(pattern, destination);
327+
if (!CollectionUtils.isEmpty(vars)) {
328+
MessageHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
329+
Assert.state(mha != null && mha.isMutable(), "Mutable MessageHeaderAccessor required");
330+
mha.setHeader(DestinationVariableMethodArgumentResolver.DESTINATION_TEMPLATE_VARIABLES_HEADER, vars);
331+
}
332+
}
333+
return super.handleMatch(mapping, handlerMethod, message);
334+
}
314335
}

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,11 @@ public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
381381
// handleNoMatch would have been invoked already
382382
return Mono.empty();
383383
}
384-
HandlerMethod handlerMethod = match.getHandlerMethod().createWithResolvedBean();
384+
return handleMatch(match.mapping, match.handlerMethod, message);
385+
}
386+
387+
protected Mono<Void> handleMatch(T mapping, HandlerMethod handlerMethod, Message<?> message) {
388+
handlerMethod = handlerMethod.createWithResolvedBean();
385389
return this.invocableHelper.handleMessage(handlerMethod, message);
386390
}
387391

spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ private DataBuffer retainDataAndReleasePayload(Payload payload) {
180180

181181
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) {
182182
MessageHeaderAccessor headers = new MessageHeaderAccessor();
183+
headers.setLeaveMutable(true);
183184
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
184185
if (this.dataMimeType != null) {
185186
headers.setContentType(this.dataMimeType);

spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@
3939
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
4040
import org.springframework.messaging.Message;
4141
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
42+
import org.springframework.messaging.handler.annotation.DestinationVariable;
4243
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
4344
import org.springframework.messaging.handler.annotation.MessageMapping;
4445
import org.springframework.messaging.handler.invocation.reactive.TestEncoderMethodReturnValueHandler;
4546
import org.springframework.messaging.support.GenericMessage;
47+
import org.springframework.messaging.support.MessageBuilder;
48+
import org.springframework.messaging.support.MessageHeaderAccessor;
4649
import org.springframework.stereotype.Controller;
4750

4851
import static java.nio.charset.StandardCharsets.*;
@@ -89,6 +92,13 @@ public void handleWithPlaceholderInMapping() {
8992
verifyOutputContent(Collections.singletonList("abcdef::response"));
9093
}
9194

95+
@Test
96+
public void handleWithDestinationVariable() {
97+
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
98+
messsageHandler.handleMessage(message("destination.test", "abcdef")).block(Duration.ofSeconds(5));
99+
verifyOutputContent(Collections.singletonList("test::abcdef::response"));
100+
}
101+
92102
@Test
93103
public void handleException() {
94104
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
@@ -143,9 +153,11 @@ private MessageMappingMessageHandler initMesssageHandler() {
143153
}
144154

145155
private Message<?> message(String destination, String... content) {
146-
return new GenericMessage<>(
147-
Flux.fromIterable(Arrays.asList(content)).map(payload -> toDataBuffer(payload)),
148-
Collections.singletonMap(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination));
156+
Flux<DataBuffer> payload = Flux.fromIterable(Arrays.asList(content)).map(parts -> toDataBuffer(parts));
157+
MessageHeaderAccessor headers = new MessageHeaderAccessor();
158+
headers.setLeaveMutable(true);
159+
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
160+
return MessageBuilder.createMessage(payload, headers.getMessageHeaders());
149161
}
150162

151163
private DataBuffer toDataBuffer(String payload) {
@@ -181,6 +193,11 @@ String handleWithPlaceholder(String payload) {
181193
return payload + "::response";
182194
}
183195

196+
@MessageMapping("destination.{variable}")
197+
String handleWithDestinationVariable(@DestinationVariable String variable, String payload) {
198+
return variable + "::" + payload + "::response";
199+
}
200+
184201
@MessageMapping("exception")
185202
String handleAndThrow() {
186203
throw new IllegalArgumentException("rejected");

0 commit comments

Comments
 (0)