Skip to content

Commit 9ceca4f

Browse files
committed
RSocketRequester, RSocketStrategies, PayloadUtils
See spring-projectsgh-21987
1 parent 483a846 commit 9ceca4f

14 files changed

+1263
-115
lines changed

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodReturnValueHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
*/
3232
public interface HandlerMethodReturnValueHandler {
3333

34-
/** Header containing a DataBufferFactory to use. */
35-
public static final String DATA_BUFFER_FACTORY_HEADER = "dataBufferFactoryHeader";
34+
/** Header containing a DataBufferFactory for use in return value handling. */
35+
String DATA_BUFFER_FACTORY_HEADER = "dataBufferFactory";
3636

3737

3838
/**
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.rsocket;
17+
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.Collections;
20+
import java.util.Map;
21+
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocket;
24+
import org.reactivestreams.Publisher;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
28+
import org.springframework.core.ParameterizedTypeReference;
29+
import org.springframework.core.ReactiveAdapter;
30+
import org.springframework.core.ResolvableType;
31+
import org.springframework.core.codec.Decoder;
32+
import org.springframework.core.codec.Encoder;
33+
import org.springframework.core.io.buffer.DataBuffer;
34+
import org.springframework.core.io.buffer.DataBufferUtils;
35+
import org.springframework.lang.Nullable;
36+
import org.springframework.util.Assert;
37+
import org.springframework.util.MimeType;
38+
39+
/**
40+
* Default, package-private {@link RSocketRequester} implementation.
41+
*
42+
* @author Rossen Stoyanchev
43+
* @since 5.2
44+
*/
45+
final class DefaultRSocketRequester implements RSocketRequester {
46+
47+
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
48+
49+
50+
private final RSocket rsocket;
51+
52+
@Nullable
53+
private final MimeType dataMimeType;
54+
55+
private final RSocketStrategies strategies;
56+
57+
private DataBuffer emptyDataBuffer;
58+
59+
60+
DefaultRSocketRequester(RSocket rsocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) {
61+
Assert.notNull(rsocket, "RSocket is required");
62+
Assert.notNull(strategies, "RSocketStrategies is required");
63+
this.rsocket = rsocket;
64+
this.dataMimeType = dataMimeType;
65+
this.strategies = strategies;
66+
this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]);
67+
}
68+
69+
70+
@Override
71+
public RSocket rsocket() {
72+
return this.rsocket;
73+
}
74+
75+
@Override
76+
public RequestSpec route(String route) {
77+
return new DefaultRequestSpec(route);
78+
}
79+
80+
81+
private static boolean isVoid(ResolvableType elementType) {
82+
return Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve());
83+
}
84+
85+
86+
private class DefaultRequestSpec implements RequestSpec {
87+
88+
private final String route;
89+
90+
91+
DefaultRequestSpec(String route) {
92+
this.route = route;
93+
}
94+
95+
96+
@Override
97+
public ResponseSpec data(Object data) {
98+
Assert.notNull(data, "'data' must not be null");
99+
return toResponseSpec(data, ResolvableType.NONE);
100+
}
101+
102+
@Override
103+
public <T, P extends Publisher<T>> ResponseSpec data(P publisher, Class<T> dataType) {
104+
Assert.notNull(publisher, "'publisher' must not be null");
105+
Assert.notNull(dataType, "'dataType' must not be null");
106+
return toResponseSpec(publisher, ResolvableType.forClass(dataType));
107+
}
108+
109+
@Override
110+
public <T, P extends Publisher<T>> ResponseSpec data(P publisher, ParameterizedTypeReference<T> dataTypeRef) {
111+
Assert.notNull(publisher, "'publisher' must not be null");
112+
Assert.notNull(dataTypeRef, "'dataTypeRef' must not be null");
113+
return toResponseSpec(publisher, ResolvableType.forType(dataTypeRef));
114+
}
115+
116+
private ResponseSpec toResponseSpec(Object input, ResolvableType dataType) {
117+
ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(input.getClass());
118+
Publisher<?> publisher;
119+
if (input instanceof Publisher) {
120+
publisher = (Publisher<?>) input;
121+
}
122+
else if (adapter != null) {
123+
publisher = adapter.toPublisher(input);
124+
}
125+
else {
126+
Mono<Payload> payloadMono = encodeValue(input, ResolvableType.forInstance(input), null)
127+
.map(this::firstPayload)
128+
.switchIfEmpty(emptyPayload());
129+
return new DefaultResponseSpec(payloadMono);
130+
}
131+
132+
if (isVoid(dataType) || (adapter != null && adapter.isNoValue())) {
133+
Mono<Payload> payloadMono = Mono.when(publisher).then(emptyPayload());
134+
return new DefaultResponseSpec(payloadMono);
135+
}
136+
137+
Encoder<?> encoder = dataType != ResolvableType.NONE && !Object.class.equals(dataType.resolve()) ?
138+
strategies.encoder(dataType, dataMimeType) : null;
139+
140+
if (adapter != null && !adapter.isMultiValue()) {
141+
Mono<Payload> payloadMono = Mono.from(publisher)
142+
.flatMap(value -> encodeValue(value, dataType, encoder))
143+
.map(this::firstPayload)
144+
.switchIfEmpty(emptyPayload());
145+
return new DefaultResponseSpec(payloadMono);
146+
}
147+
148+
Flux<Payload> payloadFlux = Flux.from(publisher)
149+
.concatMap(value -> encodeValue(value, dataType, encoder))
150+
.switchOnFirst((signal, inner) -> {
151+
DataBuffer data = signal.get();
152+
return data != null ?
153+
Flux.concat(Mono.just(firstPayload(data)), inner.skip(1).map(PayloadUtils::asPayload)) :
154+
inner.map(PayloadUtils::asPayload);
155+
})
156+
.switchIfEmpty(emptyPayload());
157+
return new DefaultResponseSpec(payloadFlux);
158+
}
159+
160+
@SuppressWarnings("unchecked")
161+
private <T> Mono<DataBuffer> encodeValue(T value, ResolvableType valueType, @Nullable Encoder<?> encoder) {
162+
if (encoder == null) {
163+
encoder = strategies.encoder(ResolvableType.forInstance(value), dataMimeType);
164+
}
165+
return DataBufferUtils.join(((Encoder<T>) encoder).encode(
166+
Mono.just(value), strategies.dataBufferFactory(), valueType, dataMimeType, EMPTY_HINTS));
167+
}
168+
169+
private Payload firstPayload(DataBuffer data) {
170+
return PayloadUtils.asPayload(getMetadata(), data);
171+
}
172+
173+
private Mono<Payload> emptyPayload() {
174+
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
175+
}
176+
177+
private DataBuffer getMetadata() {
178+
return strategies.dataBufferFactory().wrap(this.route.getBytes(StandardCharsets.UTF_8));
179+
}
180+
}
181+
182+
183+
private class DefaultResponseSpec implements ResponseSpec {
184+
185+
@Nullable
186+
private final Mono<Payload> payloadMono;
187+
188+
@Nullable
189+
private final Flux<Payload> payloadFlux;
190+
191+
192+
DefaultResponseSpec(Mono<Payload> payloadMono) {
193+
this.payloadMono = payloadMono;
194+
this.payloadFlux = null;
195+
}
196+
197+
DefaultResponseSpec(Flux<Payload> payloadFlux) {
198+
this.payloadMono = null;
199+
this.payloadFlux = payloadFlux;
200+
}
201+
202+
203+
@Override
204+
public Mono<Void> send() {
205+
Assert.notNull(this.payloadMono, "No RSocket interaction model for one-way send with Flux.");
206+
return this.payloadMono.flatMap(rsocket::fireAndForget);
207+
}
208+
209+
@Override
210+
public <T> Mono<T> retrieveMono(Class<T> dataType) {
211+
return retrieveMono(ResolvableType.forClass(dataType));
212+
}
213+
214+
@Override
215+
public <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef) {
216+
return retrieveMono(ResolvableType.forType(dataTypeRef));
217+
}
218+
219+
@Override
220+
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
221+
return retrieveFlux(ResolvableType.forClass(dataType));
222+
}
223+
224+
@Override
225+
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
226+
return retrieveFlux(ResolvableType.forType(dataTypeRef));
227+
}
228+
229+
@SuppressWarnings("unchecked")
230+
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
231+
Assert.notNull(this.payloadMono,
232+
"No RSocket interaction model for Flux request to Mono response.");
233+
234+
Mono<Payload> payloadMono = this.payloadMono.flatMap(rsocket::requestResponse);
235+
236+
if (isVoid(elementType)) {
237+
return (Mono<T>) payloadMono.then();
238+
}
239+
240+
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
241+
return (Mono<T>) decoder.decodeToMono(
242+
payloadMono.map(this::asDataBuffer), elementType, dataMimeType, EMPTY_HINTS);
243+
}
244+
245+
@SuppressWarnings("unchecked")
246+
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
247+
248+
Flux<Payload> payloadFlux = this.payloadMono != null ?
249+
this.payloadMono.flatMapMany(rsocket::requestStream) :
250+
rsocket.requestChannel(this.payloadFlux);
251+
252+
if (isVoid(elementType)) {
253+
return payloadFlux.thenMany(Flux.empty());
254+
}
255+
256+
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
257+
258+
return payloadFlux.map(this::asDataBuffer).concatMap(dataBuffer ->
259+
(Mono<T>) decoder.decodeToMono(Mono.just(dataBuffer), elementType, dataMimeType, EMPTY_HINTS));
260+
}
261+
262+
private DataBuffer asDataBuffer(Payload payload) {
263+
return PayloadUtils.asDataBuffer(payload, strategies.dataBufferFactory());
264+
}
265+
}
266+
267+
}

0 commit comments

Comments
 (0)