CORRELATED 和 simple 新版 3.0.0 的 两种生产者确认机制 如何用 两种都给一下 传统spring xml配置 不是springBoot 代码给全点 新版没有publisher-confirms
时间: 2025-07-26 11:19:51 AIGC 浏览: 24
<think>我们注意到在Spring AMQP 3.0.0版本中,`publisher-confirms`属性已被标记为过时,并推荐使用`publisher-confirm-type`属性来替代。因此,在配置连接工厂时,我们需要使用`publisher-confirm-type`来指定确认模式(CORRELATED或SIMPLE)。
根据用户的问题,我们需要在传统Spring XML配置中实现两种生产者确认机制(CORRELATED和SIMPLE),并解决3.0.0版本中`publisher-confirms`不可用的问题。
以下是调整后的配置方案:
1. 对于CORRELATED模式(异步回调):
- 设置`publisher-confirm-type="CORRELATED"`
- 同时设置`publisher-returns="true"`以支持返回机制
2. 对于SIMPLE模式(同步阻塞):
- 设置`publisher-confirm-type="SIMPLE"`
- 注意:SIMPLE模式需要开启事务(`channel-transacted="true"`)
同时,我们还需要注意,在SIMPLE模式下,发送消息后可以通过调用`waitForConfirms`或`waitForConfirmsOrDie`来同步等待确认。
下面我们给出完整的XML配置和代码示例。
注意:由于是传统Spring XML配置,我们将使用`<rabbit:connection-factory>`和`<rabbit:template>`等标签。
首先,我们配置连接工厂,使用新的`publisher-confirm-type`属性:
然后,我们分别配置两种模式的RabbitTemplate。
由于两种模式使用不同的连接工厂配置(主要是确认类型不同),我们可以创建两个连接工厂,分别用于两种模式。但在实际应用中,通常一个应用只会使用一种确认模式。不过,为了演示两种模式,我们可以配置两个连接工厂和两个对应的RabbitTemplate。
但是,同一个连接工厂只能设置一种确认类型。因此,我们需要为两种模式分别配置连接工厂。
然而,在实际应用中,我们也可以只使用一个连接工厂,然后通过设置不同的RabbitTemplate来使用不同的模式(但同一个连接工厂的确认类型是固定的)。所以,这里我们分别配置两个连接工厂。
不过,我们也可以考虑使用同一个连接工厂,然后通过程序的方式设置不同的确认类型?但连接工厂在初始化后就不能更改确认类型了。因此,为了同时支持两种模式,我们需要两个连接工厂。
但是,请注意:连接工厂是重量级资源,创建多个连接工厂会占用较多资源。因此,在实际生产环境中,通常一个应用只使用一种确认模式。但为了演示,我们这里创建两个连接工厂。
下面是完整的配置:
基础配置(rabbitmq-context.xml):</think>### RabbitMQ 生产者确认机制在 Spring 3.0.0+ 的 XML 配置方案
在 Spring AMQP 3.0.0+ 版本中,`publisher-confirms` 属性已被弃用,替代方案是使用 `publisher-confirm-type` 参数。以下是两种模式的完整实现方案:
#### 基础连接工厂配置(解决 3.0.0 兼容问题)
```xml
<!-- 兼容 3.0.0+ 的连接工厂 -->
<rabbit:connection-factory
id="connectionFactory"
host="localhost"
username="guest"
password="guest"
virtual-host="/"
publisher-confirm-type="CORRELATED" <!-- 或 SIMPLE -->
publisher-returns="true"
channel-transacted="true"/>
```
---
### 1. CORRELATED 模式(异步回调)
#### XML 配置
```xml
<!-- CORRELATED 模式专用连接工厂 -->
<rabbit:connection-factory
id="correlatedConnectionFactory"
host="localhost"
publisher-confirm-type="CORRELATED"
publisher-returns="true"/>
<!-- 回调处理器 -->
<bean id="confirmCallback" class="com.example.CorrelatedConfirmCallback"/>
<bean id="returnCallback" class="com.example.CorrelatedReturnCallback"/>
<!-- CORRELATED 模式模板 -->
<rabbit:template id="correlatedTemplate"
connection-factory="correlatedConnectionFactory"
exchange="confirm.exchange"
confirm-callback="confirmCallback"
return-callback="returnCallback"
mandatory="true"/>
```
#### Java 回调实现
```java
public class CorrelatedConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("[CORRELATED] 消息确认成功: " + correlationData.getId());
} else {
System.err.println("[CORRELATED] 消息确认失败: " + correlationData.getId()
+ " | 原因: " + cause);
// 重试逻辑(指数退避)
}
}
}
public class CorrelatedReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.err.println("[CORRELATED] 消息路由失败! "
+ "\n交换器: " + returned.getExchange()
+ "\n路由键: " + returned.getRoutingKey()
+ "\n错误码: " + returned.getReplyCode()
+ "\n原因: " + returned.getReplyText());
}
}
```
#### 生产者类
```java
public class CorrelatedProducer {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 设置消息持久化[^1]
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = new Message(message.getBytes(), props);
rabbitTemplate.send("confirm.exchange", "correlated.key", msg, correlationData);
}
}
```
---
### 2. SIMPLE 模式(同步阻塞)
#### XML 配置
```xml
<!-- SIMPLE 模式专用连接工厂 -->
<rabbit:connection-factory
id="simpleConnectionFactory"
host="localhost"
publisher-confirm-type="SIMPLE"
channel-transacted="true"/>
<!-- SIMPLE 模式模板 -->
<rabbit:template id="simpleTemplate"
connection-factory="simpleConnectionFactory"
exchange="confirm.exchange"/>
```
#### 生产者类
```java
public class SimpleProducer {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
try {
// 在事务内执行
rabbitTemplate.execute(channel -> {
// 设置消息持久化[^1]
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // PERSISTENT
.build();
channel.basicPublish(
"confirm.exchange",
"simple.key",
true, // mandatory
props,
message.getBytes()
);
// 同步等待确认(3秒超时)
if (!channel.waitForConfirms(3000)) {
throw new RuntimeException("消息确认超时");
}
return null;
});
} catch (Exception e) {
System.err.println("同步确认失败: " + e.getMessage());
// 补偿处理
}
}
}
```
---
### 3.0.0+ 版本关键变更说明
| **旧属性 (≤2.x)** | **新属性 (≥3.0)** | **作用** |
|-------------------|------------------------|-----------------------------|
| `publisher-confirms` | `publisher-confirm-type` | 指定确认类型 |
| `N/A` | `CORRELATED` | 异步回调模式 |
| `N/A` | `SIMPLE` | 同步阻塞模式 |
| `N/A` | `NONE` | 禁用确认机制(默认) |
**配置注意事项:**
1. CORRELATED 模式必须设置 `mandatory=true` 才能触发返回回调
2. SIMPLE 模式必须开启事务 (`channel-transacted="true"`)
3. 消息持久化需显式设置 `MessageDeliveryMode.PERSISTENT`[^1]
4. 推荐使用 `CachingConnectionFactory` 提升性能
---
### 完整应用上下文配置
```xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="...">
<!-- CORRELATED 模式配置 -->
<rabbit:connection-factory id="correlatedCF" .../>
<bean id="confirmCallback" class="com.example.CorrelatedConfirmCallback"/>
<rabbit:template id="correlatedTemplate" .../>
<!-- SIMPLE 模式配置 -->
<rabbit:connection-factory id="simpleCF" .../>
<rabbit:template id="simpleTemplate" .../>
<!-- 生产者声明 -->
<bean id="correlatedProducer" class="com.example.CorrelatedProducer">
<property name="rabbitTemplate" ref="correlatedTemplate"/>
</bean>
<bean id="simpleProducer" class="com.example.SimpleProducer">
<property name="rabbitTemplate" ref="simpleTemplate"/>
</bean>
<!-- 交换机和队列 -->
<rabbit:topic-exchange name="confirm.exchange">
<rabbit:bindings>
<rabbit:binding pattern="correlated.*" queue="correlatedQueue"/>
<rabbit:binding pattern="simple.*" queue="simpleQueue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="correlatedQueue" durable="true"/>
<rabbit:queue name="simpleQueue" durable="true"/>
</beans>
```
---
### 性能优化建议
```xml
<!-- 使用缓存连接工厂提升性能 -->
<bean id="cachingCF"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg ref="connectionFactory"/>
<property name="channelCacheSize" value="50"/>
</bean>
<!-- 消息压缩配置 -->
<rabbit:template id="compressedTemplate">
<property name="compressor" ref="gzipCompressor"/>
</rabbit:template>
<bean id="gzipCompressor" class="org.springframework.amqp.utils.ZlibCompressor"/>
```
---
### 相关问题
1. CORRELATED 模式中如何实现消息重试的指数退避策略?
*可通过`ScheduledExecutorService`实现延迟重试,每次失败后增加等待时间[^3]*
2. SIMPLE 模式下如何处理事务嵌套和回滚?
*使用`TransactionTemplate`定义事务边界,通过`setRollbackOnly()`标记回滚*
3. 生产者确认机制与 RabbitMQ 事务的主要区别是什么?
*确认机制轻量级(性能好),事务提供强一致性但性能开销大[^4]*
4. 如何在高并发场景下优化 CORRELATED 模式的内存使用?
*增加信道池大小,启用消息压缩,使用`BatchingRabbitTemplate`批量发送*
5. 生产者确认失败后如何将消息存入死信队列?
*配置死信交换机(DLX),在确认失败后重新发布到死信路由[^3]*
6. 如何监控两种确认模式的成功率和延迟指标?
*通过`Micrometer`暴露指标,或在回调中实现统计逻辑*
[^1]: 消息持久化需显式设置 `PERSISTENT` 属性
[^3]: 死信队列和重试策略参考 RabbitMQ 官方文档
[^4]: 事务与确认机制对比参考 AMQP 协议规范
阅读全文
相关推荐



















