Redis 作者还分叉了项目,专门搞了一个实验项目 disque 来实现一个消息队列管理工具。
https://github.com/antirez/disque
这不是这篇文章的关注重点。这篇文章是想像一下使用 Redis 做 MQ 的情况(没在生产环境使用过)。
一、为什么使用 MQ
无非就是减少耦合,削峰填谷。
中间引入一个组件做代理,自然耦合解开了。
另外,一个稳定高效的 MQ 可以快速吸收消息,避免生产者的等待,同时消费者可以根据自己的处理速度来运行。就好像水库的作用,就是削峰填谷,损有余而补不足。
PS: MQ 好像还起到了一点 HA 的作用。
PS: 一种设计完善的消息推送方案,比如一个事件多方消费,如果没有消息队列则需要多次调用。
至于可能导致的问题嘛,当然也有,就是多了一个组件引入了新的复杂性。
上面提到的好处都是建立在 MQ 稳定可靠运行的基础之上,自然需要很多新的工作来维护消息队列中间件。
小规模系统就应该怎么简单怎么来,但如果系统足够复杂,多点工作总是难免的。
1.1 RabbitMQ
Erlang 所写(我听说过的唯一一个 Erlang 项目),AMQP 0-9-1 (高级消息队列协议) 的开源实现(Mozilla 公共许可证)。
- PS: 摩根大通在 2003 年设计的 AMQP,然后和红帽合作使用 C++ 开发了一个 AMQP 开源实现,就是现在的 Apache Qpid。
-
PS: AMQP v1.0 于 2004 年成为 ISO 标准 (ISO/IEC 19464:2014)。
据说和 0-9-1 有很大差别。 -
生产者发布消息给指定 Exchange
- Exchange 根据路由规则 (RoutingKey) 将消息投递给 Queue
- 消费者订阅指定 Queue
1.2 Kafka
据说性能非常好,而且是倍经考验了,时不时看到有大型项目将其作为基础设施使用。
虽然最近两年 Kafka 很热门,但我没有在生产环境使用过。
二、Redis 实现
基础功能:
- 生产消息
- 存储消息
- 消费消息
拓展功能:
- 重复消费(多个消费者)
- 确认机制
- 持久化
- 广播(消息直接写入多个队列)
2.1 List
采用 List 构建一个简单的 FIFO 队列:
LPUSH
RPOP
/BRPOP
如果没有消息了,会返回nil
RPOPLPUSH
/BRPOPLPUSH
可以在处理完成之后再从备份队列删除,实现类似 ACK 的功能
PS: RPOPLPUSH
: 弹出一个元素,并将该元素压入另一个列表。BRPOPLPUSH
: 如果源列表没有元素会阻塞。
redicConn.lpush('queue', 'a')
while True:
msg = redisConn.rpop('queue')
if msg is None:
time.sleep(1)
continue
do_something(msg)
2.2 SortedSet
有序集合,不支持重复消息(有时可以当做功能)。
用时间戳(或加一定权重)当分数,据此排序。
2.3 Pub/Sub
实现了简单的 Pub/Sub 功能。
虽然没有 RabbitMQ 那些多 API,也不支持那些复杂的功能配置,但是在简单的场景下用用还是不错的。
- 消费者订阅指定主题
subscribe foo
- 生产者发布消息
publish foo ooxxooxx
有两个不可忽视的问题:
- Pub 上来的消息必须实时发往订阅者,如果订阅者不在线,那么消息就丢失了。
- 如果消息积压到一定程度,Redis 会强制断开消费者连接,清空消息队列。
2.4 Streams
Redis 5.0 引入的新数据类型。
这个名字可能是来自流式计算。
所有提交上来的消息按顺序串成一个消息链。
非常值得一提的是,Stream 实现的队列允许重复消费、支持 ACK
。
XADD
XDEL
XRANGE
三、总结
- Redis 不保证数据完整性,所以如果是对数据完整性有要求的话,不能使用这个方案。
- Redis 不支持消息确认机制(
Ack
/Nack
/Reject
),需要自己在应用层面实现,但如果这么做的话,不如干脆用 MQ 算了。 - 除了
Pub/Sub
之外,List
、SortedSet
、Streams
都支持消息持久化,但是应用时需要注意避免消息堆积,否则会可能会对内存造成压力。