redis stream的用法
时间: 2025-04-29 19:25:41 浏览: 37
<think>好的,用户想了解Redis Stream的使用方法,需要教程或示例代码。首先,我需要回忆一下Redis Stream的基本概念和相关命令。记得Redis Stream是Redis 5.0引入的消息队列结构,支持消费者组和持久化。
用户可能希望从基础操作开始,比如如何添加消息、读取消息,然后进阶到消费者组的使用。可能需要分步骤讲解,并附上示例代码,这样用户更容易理解。还要注意引用之前的资料,比如引用[1]和引用[2]提到的特性,比如基数树存储、命令语法等。
先整理核心命令。添加消息用XADD,读取用XREAD,创建消费者组用XGROUP CREATE,消费消息用XREADGROUP。这些是基本操作。示例代码部分可以用Python的redis模块展示,这样用户能直接看到如何应用。
需要确保示例代码正确,比如连接Redis的部分,命令参数是否正确。比如XADD的语法是XADD stream名 * key1 value1 key2 value2,其中*表示自动生成消息ID。在Python中,参数可能需要以字典形式传递。
另外,消费者组的示例要说明如何创建组,如何让不同的消费者读取消息,并处理ACK确认。这部分容易出错,要检查命令参数是否正确,比如消费者组的名称、消费者的名称,以及ACK的机制。
还要考虑用户可能的后续问题,比如性能优化、应用场景,所以在相关问题里可以列出这些,比如如何保证消息可靠性,消费者组如何处理故障,或者如何监控Stream的状态。这些能帮助用户深入理解。
最后,确保回答结构清晰,分步骤讲解,代码示例正确,并且引用标记正确。检查是否有遗漏的重要命令或使用场景,确保覆盖基本使用方法,让用户能够快速上手使用Redis Stream。</think>### Redis Stream 使用教程
#### 一、基础操作
1. **添加消息**
```python
import redis
r = redis.Redis()
# 添加消息(自动生成消息ID)
msg_id = r.xadd('mystream', {'field1': 'value1', 'field2': 'value2'})
print(f"消息ID: {msg_id}") # 格式如 1651234567890-0
```
2. **读取消息**
```python
# 读取最新消息(阻塞式,最多等待5000ms)
messages = r.xread({'mystream': '$'}, block=5000, count=1)
print("最新消息:", messages)
# 读取历史消息(从ID 0开始)
messages = r.xread({'mystream': '0'}, count=10)
```
#### 二、消费者组模式
1. **创建消费者组**
```python
# 创建消费者组(需指定起始ID)
r.xgroup_create('mystream', 'mygroup', id='0')
```
2. **消费者消费消息**
```python
# 消费者A读取未确认消息
messages = r.xreadgroup('mygroup', 'consumerA', {'mystream': '>'}, count=1)
# 处理消息后确认
for msg in messages:
r.xack('mystream', 'mygroup', msg[1][0][0]) # 参数为消息ID
```
#### 三、高级特性
1. **查看流信息**
```python
print(r.xinfo_stream('mystream')) # 包含长度、消费者组数量等
print(r.xinfo_groups('mystream')) # 显示所有消费者组
```
2. **消息ID范围查询**
```python
# 查询指定ID范围内的消息
messages = r.xrange('mystream', min='1651234567890-0', max='+', count=5)
```
#### 四、Python完整示例
```python
# 生产者
for i in range(3):
r.xadd('mystream', {'sensor': f"data{i}", 'ts': str(time.time())})
# 消费者组处理
while True:
msgs = r.xreadgroup('mygroup', 'worker1', {'mystream': '>'}, count=1)
if not msgs:
break
for msg in msgs:
process_message(msg[1][0][1]) # 自定义处理函数
r.xack('mystream', 'mygroup', msg[1][0][0])
```
> 注意:Redis Stream底层使用基数树(Radix Tree)存储,内存效率比普通列表高50%以上[^2]
阅读全文
相关推荐



















