springboot3整合kafka2的配置说明
时间: 2025-09-06 14:22:00 AIGC 浏览: 8
<think>
我们正在处理用户关于Spring Boot 3与Kafka 2整合的配置说明的请求。根据用户提供的站内引用,我们可以提取以下关键信息:
引用[1]提到:配置Kafka连接、配置Producer和Consumer,并给出了步骤大纲。
引用[2]提到:在Spring Boot中整合Kafka非常简单,并提到了添加依赖。
引用[3]提到:在配置文件中添加Kafka服务器地址,并创建生产者。
用户明确要求的是Spring Boot 3与Kafka 2的整合配置说明。因此,我们需要确保配置适用于Spring Boot 3和Kafka 2。
步骤:
1. 添加依赖:Spring Boot 3通常使用Spring Kafka 3.x版本,该版本支持Kafka 2.x。
2. 配置Kafka连接:在application.properties或application.yml中配置bootstrap-servers等。
3. 配置生产者:包括创建生产者工厂、KafkaTemplate等(在Spring Boot中通过配置类或属性配置)。
4. 配置消费者:配置消费者工厂、容器工厂、监听器等。
5. 错误处理:如重试机制和消费者异常处理。
注意:由于用户要求的是配置说明,我们将重点放在配置上,并给出示例代码。
根据以上分析,我们组织回答如下:
</think>
### Spring Boot 3与Kafka 2整合配置说明
#### 1. 添加依赖
在`pom.xml`中添加Spring Kafka依赖(兼容Spring Boot 3):
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.0</version> <!-- 兼容Kafka 2.x -->
</dependency>
```
#### 2. 配置文件设置
在`application.yml`中配置Kafka连接(需替换实际服务器地址):
```yaml
spring:
kafka:
bootstrap-servers: your-kafka-server:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
```
#### 3. 生产者配置类
创建生产者配置和模板:
```java
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
```
#### 4. 消费者配置类
```java
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
#### 5. 消息发送服务
```java
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
```
#### 6. 消息监听服务
```java
@Service
public class ConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
```
#### 关键配置说明
| 配置项 | 说明 |
|---------------------------------|----------------------------------------------------------------------|
| `bootstrap-servers` | Kafka集群地址(多个用逗号分隔) |
| `key-serializer/deserializer` | 键序列化方式(常用`StringSerializer`) |
| `value-serializer/deserializer` | 值序列化方式(JSON建议使用`JsonSerializer`) |
| `auto-offset-reset` | 消费偏移策略(`earliest`从最早开始,`latest`从最新开始) |
| `group-id` | 消费者组ID(同组共享消息) |
#### 错误处理配置
```yaml
spring:
kafka:
listener:
# 批量消费异常重试
type: batch
ack-mode: batch
consumer:
# 自动提交偏移量间隔
auto-commit-interval: 1000
```
[^1]: 该配置基于Spring Boot 3的自动配置机制,默认兼容Kafka 2.8+版本
[^2]: 生产环境建议增加SSL/SASL认证配置和重试机制
阅读全文
相关推荐

















