spring 整合 rabbitmq 实现 RPC 传统 spring 不是springBoot 给出生产者和消费者完整代码 生产者和消费者配置要分开
时间: 2025-07-26 11:20:07 AIGC 浏览: 22
<think>我们将在传统Spring框架(非Spring Boot)中整合RabbitMQ实现RPC模式,并将生产者和消费者的配置分开。RPC模式的核心在于生产者发送请求消息,并等待消费者处理后的回复。在RabbitMQ中,通常通过以下步骤实现:
1. 生产者:
- 发送消息到请求队列(request queue),并指定一个回复队列(reply queue)和关联ID(correlation id)。
- 同时,生产者会监听回复队列,当收到回复时,根据关联ID匹配请求。
2. 消费者:
- 从请求队列中获取消息,处理业务逻辑。
- 将处理结果发送到回复队列(通过消息的replyTo属性指定),并设置相同的关联ID。
在Spring中,我们可以使用`RabbitTemplate`和`SimpleMessageListenerContainer`来实现。
### 项目结构
假设项目为Maven项目,需要以下依赖:
```xml
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.18</version>
</dependency>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
```
### 配置分离
我们将生产者和消费者的配置分别放在两个Spring配置文件中:
- `producer-config.xml`:生产者配置
- `consumer-config.xml`:消费者配置
### 生产者配置(producer-config.xml)
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" username="guest" password="guest"/>
<!-- RabbitTemplate 用于发送消息和接收回复 -->
<rabbit:template id="rpcTemplate" connection-factory="connectionFactory"
reply-address="replyQueue" reply-timeout="5000">
<!-- 设置回复监听器容器,用于异步接收回复 -->
<rabbit:reply-listener concurrency="2"/>
</rabbit:template>
<!-- 声明回复队列(匿名队列,独占、自动删除) -->
<rabbit:queue name="replyQueue" declared-by="admin" exclusive="true" auto-delete="true"/>
<!-- 声明请求队列(由消费者声明,这里不声明,但生产者和消费者都需要知道队列名) -->
<!-- 注意:实际项目中,请求队列应由消费者声明,生产者只需发送到该队列 -->
<!-- 管理员,用于声明队列 -->
<rabbit:admin connection-factory="connectionFactory" id="admin"/>
</beans>
```
### 生产者代码
```java
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
public class RpcProducer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("producer-config.xml");
RabbitTemplate rabbitTemplate = context.getBean("rpcTemplate", RabbitTemplate.class);
// 设置请求队列名称(实际项目中可通过配置注入)
String requestQueue = "rpc.request.queue";
// 发送RPC请求
Object response = rabbitTemplate.convertSendAndReceive(requestQueue, "Hello RPC!");
System.out.println("Received response: " + response);
}
}
```
### 消费者配置(consumer-config.xml)
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" username="guest" password="guest"/>
<!-- 声明请求队列 -->
<rabbit:queue name="rpc.request.queue" durable="true"/>
<!-- 监听容器,用于监听请求队列 -->
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2">
<rabbit:listener ref="rpcServer" method="handle" queue-names="rpc.request.queue"/>
</rabbit:listener-container>
<!-- RPC服务端(消费者) -->
<bean id="rpcServer" class="com.example.RpcServer"/>
<!-- 用于发送回复的RabbitTemplate -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 管理员,用于声明队列 -->
<rabbit:admin connection-factory="connectionFactory"/>
</beans>
```
### 消费者代码
```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
public class RpcServer {
private RabbitTemplate rabbitTemplate;
private MessageConverter messageConverter = new SimpleMessageConverter();
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// 处理请求的方法
public void handle(Message message) {
// 解析请求
Object request = messageConverter.fromMessage(message);
System.out.println("Received request: " + request);
// 处理请求(这里简单将请求转换为大写作为回复)
String response = request.toString().toUpperCase();
// 获取回复队列和关联ID
MessageProperties properties = message.getMessageProperties();
String replyTo = properties.getReplyTo();
String correlationId = properties.getCorrelationId();
// 发送回复
Message reply = messageConverter.toMessage(response, new MessageProperties());
reply.getMessageProperties().setCorrelationId(correlationId);
rabbitTemplate.send(replyTo, reply);
}
}
```
### 消费者启动类
```java
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class RpcConsumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("consumer-config.xml");
System.out.println("RPC Consumer started. Waiting for requests...");
}
}
```
### 说明
1. **生产者配置**:
- 配置了连接工厂和RabbitTemplate,其中RabbitTemplate设置了回复队列(replyQueue)和回复监听器(reply-listener),并指定了并发数(concurrency=2)。
- 回复队列是一个匿名队列(exclusive和auto-delete为true),由RabbitAdmin自动声明。
2. **生产者代码**:
- 使用`convertSendAndReceive`方法发送消息并等待回复。该方法会阻塞直到收到回复或超时。
3. **消费者配置**:
- 声明了请求队列(rpc.request.queue)并配置了监听容器,使用RpcServer的handle方法处理消息。
- 注入了RabbitTemplate用于发送回复。
4. **消费者代码**:
- 在handle方法中,从消息中解析请求,处理业务逻辑,然后通过消息的replyTo和correlationId将回复发送到指定的回复队列。
### 注意事项
- 生产者和消费者需要知道请求队列的名称(这里是"rpc.request.queue")。
- 回复队列由生产者创建,消费者只需要将回复发送到该队列。
- 关联ID(correlationId)用于匹配请求和回复,确保生产者能正确识别回复对应的请求。
### 运行步骤
1. 启动RabbitMQ服务器。
2. 先启动消费者(RpcConsumer),创建请求队列。
3. 再启动生产者(RpcProducer),发送请求并接收回复。
### 相关问题
1. 如何确保RPC调用的可靠性(如消息确认机制)?
2. 如何处理RPC调用超时?
3. 如何扩展RPC消费者以处理高并发请求?
4. RPC模式中如何实现负载均衡?
5. 如何监控RPC调用的性能指标?
6. 在分布式系统中,如何跟踪RPC调用的全链路?</think>### Spring 整合 RabbitMQ 实现 RPC 模式(非 Spring Boot)
以下是完整的生产者和消费者配置分离实现方案,基于传统 Spring XML 配置:
#### 一、项目结构
```
src/
├── main/
│ ├── java/
│ │ ├── com/example/producer/
│ │ │ ├── RpcProducer.java # 生产者逻辑
│ │ │ └── RpcClient.java # RPC 调用客户端
│ │ ├── com/example/consumer/
│ │ │ ├── RpcConsumer.java # 消费者逻辑
│ │ │ └── RpcHandler.java # 业务处理器
│ │ └── config/
│ │ ├── RabbitMQConfig.java # 公共配置
│ ├── resources/
│ │ ├── producer-context.xml # 生产者配置
│ │ ├── consumer-context.xml # 消费者配置
│ │ └── rabbitmq.properties # MQ 连接配置
```
#### 二、公共配置 (rabbitmq.properties)
```properties
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
request.queue=rpc_request_queue
reply.queue=rpc_reply_queue
```
#### 三、生产者端配置 (producer-context.xml)
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 加载属性文件 -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:rabbitmq.properties"/>
</bean>
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}"
username="${rabbitmq.username}" password="${rabbitmq.password}"/>
<!-- RabbitTemplate 配置 -->
<bean id="rpcTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="replyAddress" value="${reply.queue}"/>
<property name="replyTimeout" value="5000"/>
</bean>
<!-- 回复监听器容器 -->
<bean id="replyListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queues" value="${reply.queue}"/>
<property name="messageListener" ref="rpcTemplate"/>
<property name="concurrentConsumers" value="3"/>
</bean>
<!-- RPC 客户端 -->
<bean id="rpcClient" class="com.example.producer.RpcClient">
<property name="rabbitTemplate" ref="rpcTemplate"/>
<property name="requestQueue" value="${request.queue}"/>
</bean>
</beans>
```
#### 四、生产者代码
```java
// RpcClient.java
public class RpcClient {
private RabbitTemplate rabbitTemplate;
private String requestQueue;
public Object sendRpcRequest(Object request) {
return rabbitTemplate.convertSendAndReceive(
requestQueue,
request,
message -> {
// 设置回复队列
message.getMessageProperties().setReplyTo(replyQueue);
return message;
}
);
}
// Setter 注入
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void setRequestQueue(String requestQueue) {
this.requestQueue = requestQueue;
}
}
// RpcProducer.java
public class RpcProducer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("producer-context.xml");
RpcClient client = context.getBean("rpcClient", RpcClient.class);
// 发送 RPC 请求
Object response = client.sendRpcRequest("Process this data");
System.out.println("Received response: " + response);
}
}
```
#### 五、消费者端配置 (consumer-context.xml)
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 加载属性文件 -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:rabbitmq.properties"/>
</bean>
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}"
username="${rabbitmq.username}" password="${rabbitmq.password}"/>
<!-- 声明队列 -->
<rabbit:queue name="${request.queue}" durable="true"/>
<rabbit:queue name="${reply.queue}" durable="true"/>
<!-- 监听容器 -->
<rabbit:listener-container connection-factory="connectionFactory" concurrency="3">
<rabbit:listener ref="rpcHandler" method="handleRequest" queues="${request.queue}"/>
</rabbit:listener-container>
<!-- 业务处理器 -->
<bean id="rpcHandler" class="com.example.consumer.RpcHandler">
<property name="rabbitTemplate" ref="rabbitTemplate"/>
</bean>
<!-- RabbitTemplate 用于发送回复 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
```
#### 六、消费者代码
```java
// RpcHandler.java
public class RpcHandler {
private RabbitTemplate rabbitTemplate;
public void handleRequest(Message request) {
// 1. 处理业务逻辑
String requestBody = new String(request.getBody());
String response = "Processed: " + requestBody.toUpperCase();
// 2. 获取回复地址
String replyTo = request.getMessageProperties().getReplyTo();
String correlationId = request.getMessageProperties().getCorrelationId();
// 3. 发送回复
MessageProperties props = new MessageProperties();
props.setCorrelationId(correlationId);
Message reply = new Message(response.getBytes(), props);
rabbitTemplate.send(replyTo, reply);
}
// Setter 注入
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
// RpcConsumer.java
public class RpcConsumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("consumer-context.xml");
System.out.println("RPC Consumer started. Waiting for requests...");
}
}
```
### 关键配置说明
1. **回复队列机制**:
- 生产者:通过 `setReplyTo()` 指定回复队列
- 消费者:通过 `getReplyTo()` 获取回复地址
- 使用 `correlationId` 匹配请求和响应[^1]
2. **并发控制**:
```xml
<!-- 生产者端 -->
<property name="concurrentConsumers" value="3"/>
<!-- 消费者端 -->
<rabbit:listener-container concurrency="3">
```
控制监听线程数,避免资源浪费[^2]
3. **超时处理**:
```xml
<property name="replyTimeout" value="5000"/>
```
设置 5 秒超时防止永久阻塞
### 工作流程
```mermaid
sequenceDiagram
participant Producer
participant RequestQueue
participant Consumer
participant ReplyQueue
Producer->>RequestQueue: 发送请求(replyTo=ReplyQueue)
Consumer->>RequestQueue: 监听请求
Consumer->>ReplyQueue: 发送响应(correlationId匹配)
Producer->>ReplyQueue: 监听响应
ReplyQueue-->>Producer: 返回处理结果
```
### 注意事项
1. **消息序列化**:默认使用 Java 原生序列化,建议使用 JSON 序列化
2. **错误处理**:添加异常处理逻辑防止消息丢失
3. **资源释放**:关闭 ApplicationContext 时自动释放连接
4. **队列声明**:确保队列在双方都已声明
### 相关问题
1. 如何实现 RabbitMQ RPC 模式中的消息超时处理?
2. 在 RPC 模式中如何保证消息的幂等性?
3. 如何优化 RabbitMQ RPC 模式的性能?
4. RPC 模式中如何处理消费者节点故障?
5. 如何实现 RabbitMQ RPC 的负载均衡?
6. 在分布式系统中如何追踪 RPC 调用的全链路?
阅读全文
相关推荐

















