传统spring整合 rabbitmq实现生产者确认机制 生产者发送一个 User对象 用xml进行配置 不是springBoot
时间: 2025-07-27 12:13:24 AIGC 浏览: 25
<think>我们是在传统Spring框架(非Spring Boot)中整合RabbitMQ,要求:
1. 实现生产者确认机制(确保消息成功发送到RabbitMQ)
2. 发送User对象(需要序列化)
3. 使用XML配置方式
步骤:
1. 添加依赖
2. 配置RabbitMQ连接工厂、模板、确认回调等(XML配置)
3. 定义User对象(可序列化)
4. 生产者发送消息
5. 消费者监听并处理消息(包括User对象的反序列化)
注意:传统Spring使用RabbitMQ,主要依赖spring-rabbit,注意版本兼容性。
详细步骤:
一、Maven依赖(示例版本,请根据实际情况调整)
```xml
<dependencies>
<!-- Spring核心 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.23</version>
</dependency>
<!-- Spring RabbitMQ集成 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.6</version> <!-- 注意:选择与RabbitMQ服务器兼容的版本 -->
</dependency>
<!-- 序列化支持(Jackson用于JSON序列化) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
</dependencies>
```
二、XML配置(spring-rabbit.xml)
配置要点:
1. 连接工厂
2. RabbitTemplate(设置消息转换器为JSON,并开启生产者确认模式)
3. 定义队列、交换机、绑定(这里以直连交换机为例)
4. 配置生产者确认回调
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 配置连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
username="guest"
password="guest"
publisher-confirms="true" /> <!-- 开启生产者确认 -->
<!-- 配置Rabbit模板 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
message-converter="jsonMessageConverter"
confirm-callback="confirmCallback" /> <!-- 设置确认回调 -->
<!-- 定义队列 -->
<rabbit:queue name="user.queue" durable="true" />
<!-- 定义直连交换机 -->
<rabbit:direct-exchange name="user.exchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="user.queue" key="user.routingkey" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 生产者确认回调的Bean -->
<bean id="confirmCallback" class="com.example.rabbitmq.ConfirmCallbackHandler" />
<!-- 消费者监听容器 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="userMessageListener" queue-names="user.queue" />
</rabbit:listener-container>
<!-- 消费者监听器 -->
<bean id="userMessageListener" class="com.example.rabbitmq.UserMessageListener" />
</beans>
```
三、生产者确认回调类(ConfirmCallbackHandler)
实现RabbitTemplate.ConfirmCallback接口,处理确认消息。
```java
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
public class ConfirmCallbackHandler implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功: " + correlationData);
} else {
System.err.println("消息发送失败: " + cause + " 数据: " + correlationData);
}
}
}
```
四、定义User对象(可序列化)
```java
import java.io.Serializable;
public class User implements Serializable {
private String name;
private Integer age;
// 必须有无参构造器
public User() {
}
// getter和setter
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
```
五、生产者发送消息
```java
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Producer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbit.xml");
AmqpTemplate rabbitTemplate = context.getBean("rabbitTemplate", AmqpTemplate.class);
User user = new User();
user.setName("张三");
user.setAge(30);
// 发送消息,指定路由键(与绑定时的路由键一致)
rabbitTemplate.convertAndSend("user.exchange", "user.routingkey", user);
System.out.println("已发送用户消息: " + user);
}
}
```
六、消费者监听器(UserMessageListener)
使用MessageListenerAdapter或者实现MessageListener接口。这里使用实现接口的方式。
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
public class UserMessageListener implements MessageListener {
private MessageConverter messageConverter = new Jackson2JsonMessageConverter();
@Override
public void onMessage(Message message) {
try {
User user = (User) messageConverter.fromMessage(message);
System.out.println("接收到用户消息: " + user);
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
注意:这里为了简单直接在监听器里创建了MessageConverter,实际可以通过Spring注入。
七、测试
1. 启动RabbitMQ服务器
2. 运行生产者(Producer.main)
3. 查看控制台输出,确认消息发送成功和接收成功。
生产者确认机制说明:
当消息被成功发送到交换机(Exchange)时,RabbitMQ会返回一个确认(ack=true);如果消息发送失败(如交换机不存在),则返回ack=false,并给出原因。
注意事项:
1. 如果消息发送到不存在的交换机,会触发确认回调ack=false。
2. 如果消息发送到交换机,但路由不到队列(比如路由键错误),则不会触发确认回调,而是触发返回模式(ReturnCallback)。如果需要处理这种情况,可以配置ReturnCallback。
扩展:配置ReturnCallback(可选)
在RabbitTemplate上设置mandatory=true(默认false)和return-callback,这样当消息无法路由到队列时,会回调ReturnCallback。
在XML中修改RabbitTemplate配置:
```xml
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
message-converter="jsonMessageConverter"
confirm-callback="confirmCallback"
return-callback="returnCallback"
mandatory="true" />
```
然后定义ReturnCallback:
```java
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
public class ReturnCallbackHandler implements ReturnCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.err.println("消息被退回: " + returned);
}
}
```
在XML中配置returnCallback的Bean:
```xml
<bean id="returnCallback" class="com.example.rabbitmq.ReturnCallbackHandler" />
```
这样,当消息无法路由到任何队列时,就会触发returnedMessage方法。
总结:通过以上步骤,我们在传统Spring项目中通过XML配置整合了RabbitMQ,实现了生产者确认机制,并发送了User对象。
§§相关问题§§
1. RabbitMQ生产者确认机制有哪几种模式?
2. 在传统Spring项目中如何配置RabbitMQ的消息转换器?
3. RabbitMQ消息无法路由到队列时应该如何处理?
4. 如何确保RabbitMQ消费者正确处理JSON序列化的消息?</think>### 传统 Spring 整合 RabbitMQ 实现生产者确认机制(XML 配置)
#### 一、核心步骤
1. **添加 Maven 依赖**
```xml
<dependencies>
<!-- Spring RabbitMQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.6</version>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
</dependencies>
```
2. **配置 XML(spring-rabbitmq.xml)**
```xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 1. 连接工厂 -->
<rabbit:connection-factory
id="connectionFactory"
host="localhost"
port="5672"
username="guest"
password="guest"
publisher-confirms="true" /> <!-- 开启生产者确认 -->
<!-- 2. 消息转换器(JSON序列化User对象) -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 3. Rabbit模板(配置生产者确认回调) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
message-converter="jsonMessageConverter"
confirm-callback="confirmCallback" /> <!-- 确认回调 -->
<!-- 4. 声明交换机 -->
<rabbit:direct-exchange name="user.exchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="user.queue" key="user.routingkey" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 5. 声明队列 -->
<rabbit:queue name="user.queue" durable="true" />
<!-- 6. 生产者确认回调Bean -->
<bean id="confirmCallback" class="com.example.RabbitConfirmCallback" />
</beans>
```
3. **生产者确认回调实现**
```java
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
public class RabbitConfirmCallback implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("✅ 消息成功到达交换机: " + correlationData);
} else {
System.err.println("❌ 消息发送失败: " + cause);
// 重发或记录日志
}
}
}
```
4. **User 对象定义**
```java
public class User implements Serializable {
private String name;
private int age;
// 省略getter/setter和toString()
}
```
5. **生产者发送消息**
```java
import org.springframework.amqp.core.AmqpTemplate;
public class Producer {
public void sendUser(User user) {
// 获取XML中配置的RabbitTemplate
AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate");
// 发送User对象(自动JSON序列化)
template.convertAndSend(
"user.exchange",
"user.routingkey",
user,
new CorrelationData(UUID.randomUUID().toString()) // 关联ID用于确认回调
);
}
}
```
6. **消费者配置**
```xml
<!-- 在spring-rabbitmq.xml中添加 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="userListener" queue-names="user.queue" />
</rabbit:listener-container>
<bean id="userListener" class="com.example.UserMessageListener" />
```
7. **消费者监听器**
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class UserMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
// 自动反序列化为User对象
User user = (User) new Jackson2JsonMessageConverter()
.fromMessage(message);
System.out.println("收到用户: " + user);
}
}
```
#### 二、关键机制说明
1. **生产者确认流程**
```mermaid
sequenceDiagram
生产者->> RabbitMQ: 发送消息(CorrelationData)
RabbitMQ-->>生产者: 返回ACK/NACK
确认回调->>生产者: 触发confirm()方法
```
2. **XML 配置核心点**:
- `publisher-confirms="true"`:启用生产者确认
- `confirm-callback`:指定回调Bean
- `Jackson2JsonMessageConverter`:对象序列化
- `CorrelationData`:消息唯一标识
3. **错误处理场景**:
- 交换机不存在:立即触发NACK
- 路由键不匹配:消息进入未路由队列(需额外配置)
- 网络故障:抛出`AmqpException`
#### 三、测试验证
```java
public class Test {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq.xml");
Producer producer = new Producer(context);
User user = new User();
user.setName("张三");
user.setAge(30);
producer.sendUser(user); // 触发确认回调
}
}
```
**输出示例**:
```
✅ 消息成功到达交换机: CorrelationData [id=7b5d5e8e-5a4f-4f33]
收到用户: User{name='张三', age=30}
```
> **注意事项**:
> 1. 确保 RabbitMQ 启用 `publisher_confirm` 功能
> 2. 对象必须实现 `Serializable` 接口
> 3. 生产环境建议添加 `mandatory=true` 处理未路由消息[^4]
> 4. 使用 `CorrelationData` 关联业务ID实现消息追踪
阅读全文
相关推荐




















