文章目录

Redis Stream消息队列与Pub/Sub发布订阅的区别

发布于 2026-05-03 02:26:52 · 浏览 4 次 · 评论 0 条

Redis Stream消息队列与Pub/Sub发布订阅的区别

在 Redis 中处理消息传递时,开发者经常面临两种选择:Pub/Sub(发布订阅)和 Stream(流)。虽然两者都能实现消息的转发,但它们的底层逻辑和适用场景截然不同。简单来说,Pub/Sub 像是“广播电台”,而 Stream 像是“录播机”或“任务队列”。


1. 理解 Pub/Sub:即时广播,断开即丢

Pub/Sub 是一种典型的“即发即弃”模式。它的核心逻辑是:消息发布者将消息发送给频道,Redis 立即将消息转发给所有当前监听该频道的订阅者。

适用场景:实时通知、即时聊天、非关键数据的广播。

操作步骤

  1. 打开终端 A,输入以下命令进入订阅模式:

    SUBSCRIBE news_channel

    此终端将进入阻塞状态,等待消息。

  2. 打开终端 B,输入以下命令发送消息:

    PUBLISH news_channel "Hello World"
  3. 观察终端 A,你会立即看到收到的消息。

    1) "message"
    2) "news_channel"
    3) "Hello World"
  4. 模拟网络中断或消费者重启:
    如果在终端 B 发送消息前,你关闭了终端 A,那么这条消息就会彻底消失。当终端 A 再次重新连接时,它无法收到之前错过的任何消息。

核心特征

  • 无状态:Redis 不保存消息历史。
  • 无确认:不管消费者是否处理成功,消息发送后即清除。
  • 易丢数据:消费者断线期间的消息全部丢失。

2. 理解 Stream:持久化日志,支持回溯

Redis Stream 是 Redis 5.0 引入的数据结构,它提供了一个日志数据结构,类似于 Apache Kafka。它将消息持久化存储在内存中,即使消费者不在线,消息也会保留,直到达到设定的过期时间。

适用场景:任务队列、日志收集、事件溯源、需要可靠传输的场景。

操作步骤

  1. 打开终端 A,使用 XADD 命令向名为 mystream 的流中添加一条消息:

    XADD mystream * sensor_id 1234 temperature 19.8

    * 表示由 Redis 自动生成唯一的 ID(通常包含时间戳)。

  2. 打开终端 B,使用 XREAD 命令读取消息。假设我们想从 ID 为 0-0 的位置(即开头)开始读取:

    XREAD COUNT 2 STREAMS mystream 0-0
  3. 观察返回结果,你会看到刚才添加的消息内容以及其 ID。

  4. 再次在终端 A 添加一条新消息:

    XADD mystream * sensor_id 5678 temperature 20.1
  5. 回到终端 B,如果你此时再次运行相同的 XREAD 命令,你只能读到新产生的消息(或者你可以指定具体 ID 来读取历史)。为了模拟消费者组(Consumer Group)的ack机制,请执行以下步骤:

    1. 创建消费者组:
      XGROUP CREATE mystream mygroup 0
    2. 使用消费者 consumer1 读取组内消息:
      XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >

      > 表示读取尚未传递给组内其他消费者的新消息。

    3. 处理完消息后,发送确认(ACK)通知 Redis 该消息已处理:
      XACK mystream mygroup <消息ID>

核心特征

  • 有状态:消息被保存在 Stream 中,可被多次读取。
  • 支持回溯:可以通过指定 ID 读取历史数据。
  • 消费者组:支持 ACK 机制,确保消息至少被处理一次,避免消息丢失。

3. 架构逻辑对比

为了更直观地理解两者的数据流向差异,请参考以下流程图。

graph LR subgraph "Pub/Sub (广播模式)" P1["生产者"] -- "PUBLISH" --> Redis1((Redis)) Redis1 -- "立即转发" --> C1["在线消费者"] Redis1 -- "立即转发" --> C2["在线消费者"] Redis1 -. "不存储消息" .-> Lost["离线消费者: 丢失消息"] end subgraph "Stream (队列模式)" P2["生产者"] -- "XADD" --> Redis2((Redis)) Redis2 -- "写入内存日志" --> StreamStore["Stream 数据结构"] StreamStore -- "XREADGROUP" --> CG["消费者组"] CG -- "ACK 确认" --> StreamStore end

4. 如何选择:决策对照表

在项目开发中,请根据业务需求参考下表进行选择。

特性维度 Pub/Sub (发布订阅) Stream (流)
数据持久化 不持久化,消息发完即删 持久化存储,直到过期或被删除
消息回溯 不支持,断开即无法获取历史 支持,可通过 ID 读取任意位置消息
可靠性 低,消费者掉线会丢消息 高,支持 ACK 机制和 Pending 列表
消费者组 不支持,所有消费者收到相同消息 支持,组内消息通过负载均衡分发
吞吐量 极高,开销极小 较高,但略低于 Pub/Sub (因维护状态)
典型用途 实时聊天、实时公告、即时通知 任务队列、异步处理、日志聚合

5. 实战建议:代码层面的选择

场景一:构建简单的实时聊天室

目标:用户 A 发送消息,用户 B、C、D 必须立即看到。

推荐方案:使用 Pub/Sub

理由:聊天消息的实时性优先级最高。如果用户掉线,重连后通常会拉取历史记录(由数据库或协议处理),而不需要 Redis 缓存所有历史聊天记录。

代码示例 (Node.js)

// 发布者
const redis = require('redis');
const publisher = redis.createClient();

// **发送**消息
publisher.publish('chat_room', 'User A: 大家好');
// 订阅者
const subscriber = redis.createClient();

// **监听**频道
subscriber.subscribe('chat_room');

subscriber.on('message', (channel, message) => {
    console.log(`收到消息: ${message}`);
});

场景二:电商系统的订单生成邮件发送

目标:用户下单后,系统必须发送确认邮件。如果邮件服务挂了,恢复后必须补发邮件,不能丢单。

推荐方案:使用 Redis Stream

理由:可靠性至关重要。订单数据不能因为邮件服务短暂故障而丢失。

代码示例 (Node.js 伪代码)

// 生产者:下单后添加任务
const producer = redis.createClient();

async function placeOrder(orderId) {
    // ... 处理订单逻辑 ...

    // **添加**任务到 Stream
    await producer.xadd('order_email_stream', '*', 'order_id', orderId);
}
// 消费者:邮件服务
const consumer = redis.createClient();

async function startEmailWorker() {
    // **创建**消费者组(仅首次执行)
    // await consumer.xgroup('CREATE', 'order_email_stream', 'email_group', '0');

    while (true) {
        // **读取**任务
        const results = await consumer.xreadgroup(
            'GROUP', 'email_group', 'worker_1',
            'COUNT', 1,
            'BLOCK', 2000,
            'STREAMS', 'order_email_stream', '>'
        );

        if (results) {
            const [stream, messages] = results[0];
            const [messageId, fields] = messages[0];
            const orderId = fields[1];

            // **发送**邮件逻辑
            await sendEmail(orderId);

            // **确认**消息处理完成
            await consumer.xack('order_email_stream', 'email_group', messageId);
        }
    }
}

通过以上对比和实操步骤,你应该能清晰地区分:Pub/Sub 用于“快”,Stream 用于“稳”。根据你的业务对实时性和可靠性的要求,做出合适的选择。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文