spring 整合 rabbitMq实现生产者确认ack 同步 给出完整的代码 springBoot
时间: 2025-07-26 20:20:00 AIGC 浏览: 27
<think>我们之前讨论的是在传统Spring(非Spring Boot)中实现RabbitMQ生产者同步确认。现在用户要求的是在Spring Boot中实现相同的功能。注意:Spring Boot对RabbitMQ的自动配置使得集成更加简便,但我们仍然需要配置生产者确认(尤其是同步确认)和回调。
在Spring Boot中,我们可以通过配置`RabbitTemplate`来启用发布者确认,并实现同步确认机制。不过需要注意的是,RabbitMQ的确认模式有两种:CORRELATED和SIMPLE。在Spring Boot 2.x中,默认使用CORRELATED模式,它支持异步回调。但同步确认需要用到SIMPLE模式,因为SIMPLE模式允许我们调用`waitForConfirms`方法。
然而,在Spring Boot中,我们通常更倾向于使用异步回调(ConfirmCallback)来处理确认。但用户明确要求同步确认,因此我们将使用SIMPLE模式,并在发送消息后调用`waitForConfirms`方法。
步骤:
1. 添加依赖
2. 配置application.properties
3. 配置RabbitTemplate(启用SIMPLE确认模式,并设置Mandatory为true以支持返回消息)
4. 编写一个发送消息并同步等待确认的方法
5. 测试
注意:Spring Boot的自动配置会为我们创建ConnectionFactory和RabbitTemplate,但我们需要自定义一些配置。
完整代码实现如下:
### 1. Maven依赖 (pom.xml)
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
```
### 2. 配置文件 (application.properties)
```properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 启用SIMPLE确认模式(注意:Spring Boot默认使用CORRELATED,这里需要覆盖)
spring.rabbitmq.publisher-confirm-type=simple
# 启用返回消息(当消息无法路由到队列时返回给生产者)
spring.rabbitmq.publisher-returns=true
```
### 3. 配置RabbitTemplate(可选,因为我们可以直接使用自动配置的RabbitTemplate,但需要设置Mandatory和回调)
我们可以在配置类中自定义RabbitTemplate,但更简单的方式是直接使用自动配置的RabbitTemplate,然后在代码中设置Mandatory和回调。
但是,由于同步确认需要调用`waitForConfirms`,我们需要在发送消息时使用RabbitTemplate的`invoke`方法,并在其中执行同步等待。
### 4. 编写消息发送服务
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息并同步等待确认
* @param exchange 交换机名称
* @param routingKey 路由键
* @param message 消息内容
* @param timeout 等待确认的超时时间(毫秒)
* @return 是否成功确认
*/
public boolean sendWithSyncConfirm(String exchange, String routingKey, String message, long timeout) {
// 创建消息
MessageProperties messageProperties = new MessageProperties();
Message amqpMessage = new Message(message.getBytes(), messageProperties);
// 创建CorrelationData,用于跟踪消息
CorrelationData correlationData = new CorrelationData("MSG-" + System.currentTimeMillis());
// 使用RabbitTemplate的invoke方法执行操作,这样我们可以获取到Channel并进行同步等待
return rabbitTemplate.invoke(operations -> {
// 发送消息
operations.send(exchange, routingKey, amqpMessage, correlationData);
try {
// 等待确认,超时时间为timeout毫秒
// 注意:这里使用的Channel是RabbitTemplate内部操作使用的同一个Channel
boolean ack = operations.getConnectionFactory().createConnection().createChannel(false)
.waitForConfirms(timeout);
if (ack) {
System.out.println("消息已同步确认: " + correlationData.getId());
} else {
System.err.println("等待确认超时: " + correlationData.getId());
}
return ack;
} catch (Exception e) {
System.err.println("等待确认异常: " + e.getMessage());
return false;
}
}, connectionFactory -> {
// 这里可以设置一些恢复操作,但通常不需要
});
}
}
```
### 5. 编写测试Controller(可选,用于测试)
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private MessageSender messageSender;
@GetMapping("/send")
public String send() {
boolean result = messageSender.sendWithSyncConfirm(
"amqp.topic",
"huawei.a",
"Hello, RabbitMQ!",
5000
);
return result ? "发送成功" : "发送失败";
}
}
```
### 6. 主启动类
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}
```
### 7. 测试
启动应用,访问`http://localhost:8080/send`,观察控制台输出。
### 注意事项:
1. **配置确认模式**:在`application.properties`中设置`spring.rabbitmq.publisher-confirm-type=simple`,这是同步等待确认的关键。
2. **同步等待的阻塞**:`waitForConfirms`方法会阻塞当前线程,因此在并发量大的场景下可能会影响性能。如果不需要同步等待,建议使用异步回调(ConfirmCallback)。
3. **资源管理**:在`invoke`方法中,我们通过`createConnection().createChannel(false)`创建了一个新的Channel,但实际上我们也可以使用RabbitTemplate内部操作使用的Channel。然而,RabbitTemplate的`invoke`方法中,我们无法直接获取到当前操作的Channel。因此,上述代码中我们创建了一个新的Channel,这并不合适。正确的方式是使用`operations.getChannel()`,但注意在RabbitTemplate的`invoke`方法中,传递给lambda的`operations`实际上是`RabbitTemplate`内部的`ChannelCallback`,它并没有直接提供getChannel方法。因此,我们需要调整方法。
修正:实际上,在RabbitTemplate的`invoke`方法中,我们可以通过`ChannelAwareMessageListener`的方式获取Channel,但这里我们可以使用另一种方式:在RabbitTemplate的`doExecute`方法中,它会将Channel暴露给回调。我们可以这样修改:
```java
public boolean sendWithSyncConfirm(String exchange, String routingKey, String message, long timeout) {
return rabbitTemplate.invoke(t -> {
// 发送消息
MessageProperties messageProperties = new MessageProperties();
Message amqpMessage = new Message(message.getBytes(), messageProperties);
CorrelationData correlationData = new CorrelationData("MSG-" + System.currentTimeMillis());
// 发送消息
t.send(exchange, routingKey, amqpMessage, correlationData);
try {
// 获取当前操作的Channel(在同一个事务或确认模式下)
com.rabbitmq.client.Channel channel = t.getConnectionFactory().createConnection().createChannel(false);
// 注意:上面的方式创建了新的Channel,这是错误的。我们应该使用当前操作使用的同一个Channel。
// 实际上,RabbitTemplate在同一个线程中操作使用的是同一个Channel,我们可以通过以下方式获取:
// 但是RabbitTemplate并没有提供直接获取当前Channel的方法。因此,我们需要改变策略。
// 替代方案:使用SIMPLE模式时,我们可以通过CorrelationData来等待确认,但这里我们使用waitForConfirms需要当前Channel。
// 由于RabbitTemplate在发送消息时使用的是同一个连接,我们可以通过创建一个新的Channel来等待,但这样并不准确,因为确认是针对特定Channel的。
// 因此,我们可能需要改变实现:不使用RabbitTemplate的invoke,而是直接使用Channel来发送消息并同步等待。但这会使得代码脱离Spring的封装。
// 考虑到同步确认的需求,我们可能需要在Spring Boot中直接使用RabbitMQ的Java客户端。但这样会失去Spring的封装。
// 另一种思路:在发送消息之前,我们设置一个监听器,当确认到达时通知我们,然后使用CountDownLatch进行同步等待。但这样就是异步转同步。
// 由于时间限制,这里我们采用直接使用RabbitTemplate的异步回调,然后使用CountDownLatch实现同步等待。但注意,这样可能会丢失一些异常情况。
// 重新设计:使用CountDownLatch
CountDownLatch latch = new CountDownLatch(1);
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
System.out.println("消息已确认: " + correlationData.getId());
} else {
System.err.println("消息未确认: " + correlationData.getId());
}
latch.countDown();
}, ex -> {
System.err.println("确认过程中发生异常: " + ex.getMessage());
latch.countDown();
});
// 等待确认,最多等待timeout毫秒
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
System.err.println("等待确认超时: " + correlationData.getId());
return false;
}
return correlationData.getFuture().isAck();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("等待确认被中断: " + e.getMessage());
return false;
}
});
}
```
但是,上述使用CountDownLatch的方式是异步转同步,它依赖于RabbitTemplate的异步回调。而RabbitTemplate的异步回调是在收到Broker的确认后触发的。
因此,我们修改配置:使用CORRELATED模式(默认),然后使用CountDownLatch等待异步回调。
调整配置:
```properties
# 使用correlated模式(默认)
spring.rabbitmq.publisher-confirm-type=correlated
```
然后修改MessageSender:
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public boolean sendWithSyncConfirm(String exchange, String routingKey, String message, long timeout) {
// 创建消息
MessageProperties messageProperties = new MessageProperties();
Message amqpMessage = new Message(message.getBytes(), messageProperties);
// 创建CorrelationData,用于跟踪消息
CorrelationData correlationData = new CorrelationData("MSG-" + System.currentTimeMillis());
// 发送消息
rabbitTemplate.send(exchange, routingKey, amqpMessage, correlationData);
// 使用CountDownLatch等待确认
CountDownLatch latch = new CountDownLatch(1);
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
System.out.println("消息已确认: " + correlationData.getId());
} else {
System.err.println("消息未确认: " + correlationData.getId());
}
latch.countDown();
},
ex -> {
System.err.println("确认过程中发生异常: " + ex.getMessage());
latch.countDown();
}
);
try {
if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
return correlationData.getFuture().isAck();
} else {
System.err.println("等待确认超时: " + correlationData.getId());
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("等待确认被中断: " + e.getMessage());
return false;
}
}
}
```
这种方式的优点:
- 使用Spring Boot默认的CORRELATED模式
- 利用`CorrelationData`的`CompletableFuture`进行异步回调,然后通过`CountDownLatch`转换为同步等待
缺点:
- 每个消息发送都会创建一个CountDownLatch,在高并发场景下可能会有性能开销
因此,根据业务需求选择同步还是异步。如果必须同步确认,这种方式是可行的。
### 最终代码结构:
- 配置文件:使用`spring.rabbitmq.publisher-confirm-type=correlated`(默认也是这个,可以省略)
- MessageSender:使用CountDownLatch等待异步确认
这样,我们就实现了在Spring Boot中同步等待生产者确认的功能。
### 相关问题
1. 在Spring Boot中如何配置RabbitMQ的异步生产者确认?
2. RabbitMQ的生产者确认机制与事务机制有何区别?
3. 如何保证消息100%投递成功(包括路由到队列)?</think># Spring Boot整合RabbitMQ实现生产者同步确认(ACK)
下面我将提供一个完整的Spring Boot实现RabbitMQ生产者同步确认的解决方案。与传统的Spring框架不同,Spring Boot通过自动配置简化了RabbitMQ的集成过程。
## 完整实现代码
### 1. Maven依赖 (pom.xml)
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
```
### 2. 配置文件 (application.yml)
```yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 启用生产者确认机制
publisher-confirm-type: correlated
# 启用返回机制(路由失败时返回消息)
publisher-returns: true
# 强制消息路由(确保返回机制生效)
template:
mandatory: true
```
### 3. RabbitMQ配置类
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("✅ 消息已确认到达交换机: " + correlationData.getId());
} else {
System.err.println("❌ 消息未到达交换机: " + cause);
}
});
// 设置返回回调(路由失败时触发)
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String msgBody = new String(message.getBody());
System.err.println("⚠️ 消息路由失败: " + msgBody +
" | 路由键: " + routingKey +
" | 错误: " + replyText);
});
return rabbitTemplate;
}
}
```
### 4. 消息生产者服务
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
public class MessageProducerService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息并同步等待确认
*
* @param exchange 交换机名称
* @param routingKey 路由键
* @param messageContent 消息内容
* @param timeout 等待确认超时时间(毫秒)
* @return 是否成功确认
*/
public boolean sendWithSyncConfirm(String exchange, String routingKey,
String messageContent, long timeout) {
// 创建消息
Message message = new Message(
messageContent.getBytes(),
new MessageProperties()
);
// 创建唯一ID用于跟踪消息
String correlationId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(correlationId);
// 使用CountDownLatch实现同步等待
CountDownLatch latch = new CountDownLatch(1);
// 添加自定义回调
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
System.out.println("🔄 异步确认: " + correlationId);
}
latch.countDown();
},
ex -> {
System.err.println("❗ 确认异常: " + ex.getMessage());
latch.countDown();
}
);
// 发送消息
rabbitTemplate.send(exchange, routingKey, message, correlationData);
System.out.println("📤 消息已发送: " + correlationId);
try {
// 同步等待确认结果
boolean confirmed = latch.await(timeout, TimeUnit.MILLISECONDS);
if (confirmed) {
System.out.println("🔄 消息已同步确认: " + correlationId);
return correlationData.getFuture().isAck();
} else {
System.err.println("⏱️ 等待确认超时: " + correlationId);
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("❗ 等待确认被中断: " + e.getMessage());
return false;
}
}
}
```
### 5. 控制器示例
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducerService messageProducerService;
@GetMapping("/send")
public String sendMessage(
@RequestParam(defaultValue = "test-exchange") String exchange,
@RequestParam(defaultValue = "test.routing.key") String routingKey,
@RequestParam(defaultValue = "Hello RabbitMQ!") String message,
@RequestParam(defaultValue = "5000") long timeout) {
boolean confirmed = messageProducerService.sendWithSyncConfirm(
exchange, routingKey, message, timeout
);
return confirmed ? "✅ 消息已确认" : "❌ 消息未确认";
}
}
```
### 6. 主应用类
```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
}
```
## 关键机制说明
### 1. 生产者确认流程
```mermaid
sequenceDiagram
participant P as Producer
participant RT as RabbitTemplate
participant B as Broker
P->>RT: 发送消息
RT->>B: 转发消息到Exchange
alt 消息到达Exchange
B-->>RT: 发送ACK
RT->>P: 触发ConfirmCallback
else 消息未到达Exchange
B-->>RT: 发送NACK
RT->>P: 触发ConfirmCallback
end
alt 消息路由到队列
B-->>RT: (无操作)
else 消息未路由到队列
B-->>RT: 返回消息
RT->>P: 触发ReturnCallback
end
```
### 2. 同步确认实现原理
1. **异步转同步**:使用`CountDownLatch`将RabbitMQ的异步确认机制转换为同步等待
2. **双重确认**:
- 通过`ConfirmCallback`实现异步确认
- 通过`CountDownLatch.await()`实现同步等待
3. **超时处理**:设置合理的等待超时时间,防止线程无限期阻塞
### 3. 配置选项说明
| 配置项 | 说明 |
|--------------------------------|---------------------------------------------------------------------|
| `publisher-confirm-type` | 确认类型:`none`(禁用), `simple`(同步), `correlated`(异步) [^1] |
| `publisher-returns` | 是否启用返回机制(当消息无法路由时返回给生产者) |
| `template.mandatory` | 设置为true时,如果消息无法路由到队列,会触发ReturnCallback |
## 测试方法
1. 启动RabbitMQ服务器
2. 运行Spring Boot应用
3. 访问测试端点:
```
http://localhost:8080/send?exchange=test-exchange&routingKey=valid.key&message=TestMessage
```
4. 观察控制台输出,确认消息发送和确认状态
## 性能与可靠性权衡
开启生产者确认机制会增加RabbitMQ服务的负载,可能会影响性能[^1]。在需要高可靠性的场景中,这种性能开销是值得的,但在高性能场景可能需要权衡:
1. **同步确认**:可靠性最高,但性能最低(每次发送都需要等待确认)
2. **异步确认**:性能较好,但需要处理回调逻辑
3. **批量确认**:通过批量发送消息后一次性确认,平衡性能和可靠性
## 常见问题解决方案
1. **消息重复**:使用唯一ID(如UUID)作为消息标识,实现消费端幂等性
2. **确认超时**:合理设置超时时间(通常2-5秒),根据网络环境调整
3. **路由失败**:检查交换机和队列绑定关系,确保路由键正确
4. **资源泄漏**:确保正确处理异常,释放资源
## 最佳实践建议
1. **异步处理**:在不需要严格同步确认的场景,使用异步回调机制
2. **重试机制**:对于未确认的消息,实现指数退避重试策略
3. **监控告警**:记录未确认消息日志,并设置告警机制
4. **资源隔离**:为重要消息使用专用连接和通道,避免资源竞争
阅读全文
相关推荐




















