Redis Stream消息队列与Pub/Sub发布订阅的区别
在 Redis 中处理消息传递时,开发者经常面临两种选择:Pub/Sub(发布订阅)和 Stream(流)。虽然两者都能实现消息的转发,但它们的底层逻辑和适用场景截然不同。简单来说,Pub/Sub 像是“广播电台”,而 Stream 像是“录播机”或“任务队列”。
1. 理解 Pub/Sub:即时广播,断开即丢
Pub/Sub 是一种典型的“即发即弃”模式。它的核心逻辑是:消息发布者将消息发送给频道,Redis 立即将消息转发给所有当前监听该频道的订阅者。
适用场景:实时通知、即时聊天、非关键数据的广播。
操作步骤:
-
打开终端 A,输入以下命令进入订阅模式:
SUBSCRIBE news_channel此终端将进入阻塞状态,等待消息。
-
打开终端 B,输入以下命令发送消息:
PUBLISH news_channel "Hello World" -
观察终端 A,你会立即看到收到的消息。
1) "message" 2) "news_channel" 3) "Hello World" -
模拟网络中断或消费者重启:
如果在终端 B 发送消息前,你关闭了终端 A,那么这条消息就会彻底消失。当终端 A 再次重新连接时,它无法收到之前错过的任何消息。
核心特征:
- 无状态:Redis 不保存消息历史。
- 无确认:不管消费者是否处理成功,消息发送后即清除。
- 易丢数据:消费者断线期间的消息全部丢失。
2. 理解 Stream:持久化日志,支持回溯
Redis Stream 是 Redis 5.0 引入的数据结构,它提供了一个日志数据结构,类似于 Apache Kafka。它将消息持久化存储在内存中,即使消费者不在线,消息也会保留,直到达到设定的过期时间。
适用场景:任务队列、日志收集、事件溯源、需要可靠传输的场景。
操作步骤:
-
打开终端 A,使用
XADD命令向名为mystream的流中添加一条消息:XADD mystream * sensor_id 1234 temperature 19.8*表示由 Redis 自动生成唯一的 ID(通常包含时间戳)。 -
打开终端 B,使用
XREAD命令读取消息。假设我们想从 ID 为0-0的位置(即开头)开始读取:XREAD COUNT 2 STREAMS mystream 0-0 -
观察返回结果,你会看到刚才添加的消息内容以及其 ID。
-
再次在终端 A 添加一条新消息:
XADD mystream * sensor_id 5678 temperature 20.1 -
回到终端 B,如果你此时再次运行相同的
XREAD命令,你只能读到新产生的消息(或者你可以指定具体 ID 来读取历史)。为了模拟消费者组(Consumer Group)的ack机制,请执行以下步骤:- 创建消费者组:
XGROUP CREATE mystream mygroup 0 - 使用消费者
consumer1读取组内消息:XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >>表示读取尚未传递给组内其他消费者的新消息。 - 处理完消息后,发送确认(ACK)通知 Redis 该消息已处理:
XACK mystream mygroup <消息ID>
- 创建消费者组:
核心特征:
- 有状态:消息被保存在 Stream 中,可被多次读取。
- 支持回溯:可以通过指定 ID 读取历史数据。
- 消费者组:支持 ACK 机制,确保消息至少被处理一次,避免消息丢失。
3. 架构逻辑对比
为了更直观地理解两者的数据流向差异,请参考以下流程图。
4. 如何选择:决策对照表
在项目开发中,请根据业务需求参考下表进行选择。
| 特性维度 | Pub/Sub (发布订阅) | Stream (流) |
|---|---|---|
| 数据持久化 | 不持久化,消息发完即删 | 持久化存储,直到过期或被删除 |
| 消息回溯 | 不支持,断开即无法获取历史 | 支持,可通过 ID 读取任意位置消息 |
| 可靠性 | 低,消费者掉线会丢消息 | 高,支持 ACK 机制和 Pending 列表 |
| 消费者组 | 不支持,所有消费者收到相同消息 | 支持,组内消息通过负载均衡分发 |
| 吞吐量 | 极高,开销极小 | 较高,但略低于 Pub/Sub (因维护状态) |
| 典型用途 | 实时聊天、实时公告、即时通知 | 任务队列、异步处理、日志聚合 |
5. 实战建议:代码层面的选择
场景一:构建简单的实时聊天室
目标:用户 A 发送消息,用户 B、C、D 必须立即看到。
推荐方案:使用 Pub/Sub。
理由:聊天消息的实时性优先级最高。如果用户掉线,重连后通常会拉取历史记录(由数据库或协议处理),而不需要 Redis 缓存所有历史聊天记录。
代码示例 (Node.js):
// 发布者
const redis = require('redis');
const publisher = redis.createClient();
// **发送**消息
publisher.publish('chat_room', 'User A: 大家好');
// 订阅者
const subscriber = redis.createClient();
// **监听**频道
subscriber.subscribe('chat_room');
subscriber.on('message', (channel, message) => {
console.log(`收到消息: ${message}`);
});
场景二:电商系统的订单生成邮件发送
目标:用户下单后,系统必须发送确认邮件。如果邮件服务挂了,恢复后必须补发邮件,不能丢单。
推荐方案:使用 Redis Stream。
理由:可靠性至关重要。订单数据不能因为邮件服务短暂故障而丢失。
代码示例 (Node.js 伪代码):
// 生产者:下单后添加任务
const producer = redis.createClient();
async function placeOrder(orderId) {
// ... 处理订单逻辑 ...
// **添加**任务到 Stream
await producer.xadd('order_email_stream', '*', 'order_id', orderId);
}
// 消费者:邮件服务
const consumer = redis.createClient();
async function startEmailWorker() {
// **创建**消费者组(仅首次执行)
// await consumer.xgroup('CREATE', 'order_email_stream', 'email_group', '0');
while (true) {
// **读取**任务
const results = await consumer.xreadgroup(
'GROUP', 'email_group', 'worker_1',
'COUNT', 1,
'BLOCK', 2000,
'STREAMS', 'order_email_stream', '>'
);
if (results) {
const [stream, messages] = results[0];
const [messageId, fields] = messages[0];
const orderId = fields[1];
// **发送**邮件逻辑
await sendEmail(orderId);
// **确认**消息处理完成
await consumer.xack('order_email_stream', 'email_group', messageId);
}
}
}
通过以上对比和实操步骤,你应该能清晰地区分:Pub/Sub 用于“快”,Stream 用于“稳”。根据你的业务对实时性和可靠性的要求,做出合适的选择。

暂无评论,快来抢沙发吧!