TOC

使用 Redis 做消息队列

Redis 作者还分叉了项目,专门搞了一个实验项目 disque 来实现一个消息队列管理工具。

https://github.com/antirez/disque

这不是这篇文章的关注重点。这篇文章是想像一下使用 Redis 做 MQ 的情况(没在生产环境使用过)。

一、为什么使用 MQ

无非就是减少耦合,削峰填谷。

中间引入一个组件做代理,自然耦合解开了。
另外,一个稳定高效的 MQ 可以快速吸收消息,避免生产者的等待,同时消费者可以根据自己的处理速度来运行。就好像水库的作用,就是削峰填谷,损有余而补不足。
PS: MQ 好像还起到了一点 HA 的作用。
PS: 一种设计完善的消息推送方案,比如一个事件多方消费,如果没有消息队列则需要多次调用。

至于可能导致的问题嘛,当然也有,就是多了一个组件引入了新的复杂性。
上面提到的好处都是建立在 MQ 稳定可靠运行的基础之上,自然需要很多新的工作来维护消息队列中间件。
小规模系统就应该怎么简单怎么来,但如果系统足够复杂,多点工作总是难免的。

1.1 RabbitMQ

Erlang 所写(我听说过的唯一一个 Erlang 项目),AMQP 0-9-1 (高级消息队列协议) 的开源实现(Mozilla 公共许可证)。

  • PS: 摩根大通在 2003 年设计的 AMQP,然后和红帽合作使用 C++ 开发了一个 AMQP 开源实现,就是现在的 Apache Qpid。
  • PS: AMQP v1.0 于 2004 年成为 ISO 标准 (ISO/IEC 19464:2014)。
    据说和 0-9-1 有很大差别。

  • 生产者发布消息给指定 Exchange

  • Exchange 根据路由规则 (RoutingKey) 将消息投递给 Queue
  • 消费者订阅指定 Queue

1.2 Kafka

据说性能非常好,而且是倍经考验了,时不时看到有大型项目将其作为基础设施使用。
虽然最近两年 Kafka 很热门,但我没有在生产环境使用过。

二、Redis 实现

基础功能:

  • 生产消息
  • 存储消息
  • 消费消息

拓展功能:

  • 重复消费(多个消费者)
  • 确认机制
  • 持久化
  • 广播(消息直接写入多个队列)

2.1 List

采用 List 构建一个简单的 FIFO 队列:

  • LPUSH
  • RPOP/BRPOP 如果没有消息了,会返回 nil
  • RPOPLPUSH / BRPOPLPUSH 可以在处理完成之后再从备份队列删除,实现类似 ACK 的功能

PS: RPOPLPUSH: 弹出一个元素,并将该元素压入另一个列表。BRPOPLPUSH: 如果源列表没有元素会阻塞。

redicConn.lpush('queue', 'a')

while True:
    msg = redisConn.rpop('queue')
    if msg is None:
        time.sleep(1)
        continue
    do_something(msg)

2.2 SortedSet

有序集合,不支持重复消息(有时可以当做功能)。

用时间戳(或加一定权重)当分数,据此排序。


2.3 Pub/Sub

实现了简单的 Pub/Sub 功能。
虽然没有 RabbitMQ 那些多 API,也不支持那些复杂的功能配置,但是在简单的场景下用用还是不错的。

  1. 消费者订阅指定主题 subscribe foo
  2. 生产者发布消息 publish foo ooxxooxx

有两个不可忽视的问题:

  1. Pub 上来的消息必须实时发往订阅者,如果订阅者不在线,那么消息就丢失了。
  2. 如果消息积压到一定程度,Redis 会强制断开消费者连接,清空消息队列。

2.4 Streams

Redis 5.0 引入的新数据类型。
这个名字可能是来自流式计算。
所有提交上来的消息按顺序串成一个消息链。

非常值得一提的是,Stream 实现的队列允许重复消费、支持 ACK

  • XADD
  • XDEL
  • XRANGE

三、总结

  1. Redis 不保证数据完整性,所以如果是对数据完整性有要求的话,不能使用这个方案。
  2. Redis 不支持消息确认机制(Ack/Nack/Reject),需要自己在应用层面实现,但如果这么做的话,不如干脆用 MQ 算了。
  3. 除了 Pub/Sub 之外,ListSortedSetStreams 都支持消息持久化,但是应用时需要注意避免消息堆积,否则会可能会对内存造成压力。

如何保证消息的完整性呢?

参考资料与拓展阅读