Skip to content

Commit c01f350

Browse files
committed
Move MonoToListenableFutureAdapter to spring-core
This was a package private class in spring-messaging since 5.0, and was recently made public in 5.1. This commit promotes it to spring-core where it belongs next to all other ListenableFuture support classes. Follow-up refactoring for SPR-17336
1 parent 928c541 commit c01f350

File tree

6 files changed

+101
-50
lines changed

6 files changed

+101
-50
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.util.concurrent;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import reactor.core.publisher.Mono;
23+
import reactor.core.publisher.MonoProcessor;
24+
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* Adapts a {@link Mono} into a {@link ListenableFuture}.
30+
*
31+
* @author Rossen Stoyanchev
32+
* @author Stephane Maldini
33+
* @since 5.1
34+
* @param <T> the object type
35+
*/
36+
public class MonoToListenableFutureAdapter<T> implements ListenableFuture<T> {
37+
38+
private final MonoProcessor<T> processor;
39+
40+
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
41+
42+
43+
public MonoToListenableFutureAdapter(Mono<T> mono) {
44+
Assert.notNull(mono, "Mono must not be null");
45+
this.processor = mono
46+
.doOnSuccess(this.registry::success)
47+
.doOnError(this.registry::failure)
48+
.toProcessor();
49+
}
50+
51+
52+
@Override
53+
@Nullable
54+
public T get() {
55+
return this.processor.block();
56+
}
57+
58+
@Override
59+
@Nullable
60+
public T get(long timeout, TimeUnit unit) {
61+
Assert.notNull(unit, "TimeUnit must not be null");
62+
Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit));
63+
return this.processor.block(duration);
64+
}
65+
66+
@Override
67+
public boolean cancel(boolean mayInterruptIfRunning) {
68+
if (isCancelled()) {
69+
return false;
70+
}
71+
this.processor.cancel();
72+
// isCancelled may still return false, if mono completed before the cancel
73+
return this.processor.isCancelled();
74+
}
75+
76+
@Override
77+
public boolean isCancelled() {
78+
return this.processor.isCancelled();
79+
}
80+
81+
@Override
82+
public boolean isDone() {
83+
return this.processor.isTerminated();
84+
}
85+
86+
@Override
87+
public void addCallback(ListenableFutureCallback<? super T> callback) {
88+
this.registry.addCallback(callback);
89+
}
90+
91+
@Override
92+
public void addCallback(SuccessCallback<? super T> success, FailureCallback failure) {
93+
this.registry.addSuccessCallback(success);
94+
this.registry.addFailureCallback(failure);
95+
}
96+
97+
}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.springframework.messaging.support;
16+
package org.springframework.util.concurrent;
1717

1818
import java.time.Duration;
1919
import java.util.concurrent.Future;
@@ -22,8 +22,6 @@
2222
import org.junit.Test;
2323
import reactor.core.publisher.Mono;
2424

25-
import org.springframework.util.concurrent.ListenableFuture;
26-
2725
import static org.junit.Assert.*;
2826

2927
/**

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.springframework.core.MethodParameter;
2222
import org.springframework.core.ReactiveAdapter;
2323
import org.springframework.core.ReactiveAdapterRegistry;
24-
import org.springframework.messaging.support.MonoToListenableFutureAdapter;
2524
import org.springframework.util.Assert;
2625
import org.springframework.util.concurrent.ListenableFuture;
26+
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
2727

2828
/**
2929
* Support for single-value reactive types (like {@code Mono} or {@code Single})

spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@
4949

5050
import org.springframework.lang.Nullable;
5151
import org.springframework.messaging.Message;
52-
import org.springframework.messaging.support.MonoToListenableFutureAdapter;
5352
import org.springframework.messaging.tcp.ReconnectStrategy;
5453
import org.springframework.messaging.tcp.TcpConnection;
5554
import org.springframework.messaging.tcp.TcpConnectionHandler;
5655
import org.springframework.messaging.tcp.TcpOperations;
5756
import org.springframework.util.Assert;
5857
import org.springframework.util.concurrent.ListenableFuture;
58+
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
5959
import org.springframework.util.concurrent.SettableListenableFuture;
6060

6161
/**

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import reactor.netty.NettyOutbound;
2424

2525
import org.springframework.messaging.Message;
26-
import org.springframework.messaging.support.MonoToListenableFutureAdapter;
2726
import org.springframework.messaging.tcp.TcpConnection;
2827
import org.springframework.util.concurrent.ListenableFuture;
28+
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
2929

3030
/**
3131
* Reactor Netty based implementation of {@link TcpConnection}.

0 commit comments

Comments
 (0)