#3 Golang RabbitMQ

2021-06-18

RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。

说明:采用 github.com/streadway/amqp 库。

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

生产者

生产者基本流程

生产者:连接

  1. amqp.Dial -> amqp.Connection
  2. amqp.Connection.Channel -> amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
    log.Fatalf("Failed to open a channel: %s", err)
}

生产者:配置(可选)

事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。

  1. amqp.Channel.QueueDeclare
  2. amqp.Channel.ExchangeDeclare
  3. amqp.Channel.QueueBind
q, err := ch.QueueDeclare(
    "hello", // 队列名称
    true,    // 持久化
    false,   // 自动删除
    false,   // 独占
    false,   // 等待服务器回应
    nil,     // 额外参数
)
if err != nil {
    log.Fatalf("Failed to declare a queue: %s", err)
}

生产者:发送

  1. amqp.Channel.Publish
err = ch.Publish(
    "",     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("hello world"),
    })
if err != nil {
    log.Fatalf("Failed to publish a message: %s", err)
}

生产者:收尾

  1. amqp.Connection.Close
  2. amqp.Channel.Close

消费者

消费者基本流程

和生产者基本一致。只是调用的的是 chan.Consume 而不是 chan.Publish
然后就是配置阶段,消费者只用关心队列在不在。

参考资料与拓展阅读

#2 使用 Redis 做消息队列

2021-03-06

Redis 作者还分叉了项目,专门搞了一个实验项目 disque 来实现一个消息队列管理工具。

https://github.com/antirez/disque

这不是这篇文章的关注重点。这篇文章是想像一下使用 Redis 做 MQ 的情况(没在生产环境使用过)。

一、为什么使用 MQ

无非就是减少耦合,削峰填谷。

中间引入一个组件做代理,自然耦合解开了。
另外,一个稳定高效的 MQ 可以快速吸收消息,避免生产者的等待,同时消费者可以根据自己的处理速度来运行。就好像水库的作用,就是削峰填谷,损有余而补不足。
PS: MQ 好像还起到了一点 HA 的作用。
PS: 一种设计完善的消息推送方案,比如一个事件多方消费,如果没有消息队列则需要多次调用。

至于可能导致的问题嘛,当然也有,就是多了一个组件引入了新的复杂性。
上面提到的好处都是建立在 MQ 稳定可靠运行的基础之上,自然需要很多新的工作来维护消息队列中间件。
小规模系统就应该怎么简单怎么来,但如果系统足够复杂,多点工作总是难免的。

1.1 RabbitMQ

Erlang 所写(我听说过的唯一一个 Erlang 项目),AMQP 0-9-1 (高级消息队列协议) 的开源实现(Mozilla 公共许可证)。

  • PS: 摩根大通在 2003 年设计的 AMQP,然后和红帽合作使用 C++ 开发了一个 AMQP 开源实现,就是现在的 Apache Qpid。
  • PS: AMQP v1.0 于 2004 年成为 ISO 标准 (ISO/IEC 19464:2014)。
    据说和 0-9-1 有很大差别。

  • 生产者发布消息给指定 Exchange

  • Exchange 根据路由规则 (RoutingKey) 将消息投递给 Queue
  • 消费者订阅指定 Queue

1.2 Kafka

据说性能非常好,而且是倍经考验了,时不时看到有大型项目将其作为基础设施使用。
虽然最近两年 Kafka 很热门,但我没有在生产环境使用过。

二、Redis 实现

基础功能:

  • 生产消息
  • 存储消息
  • 消费消息

拓展功能:

  • 重复消费(多个消费者)
  • 确认机制
  • 持久化
  • 广播(消息直接写入多个队列)

2.1 List

采用 List 构建一个简单的 FIFO 队列:

  • LPUSH
  • RPOP/BRPOP 如果没有消息了,会返回 nil
  • RPOPLPUSH / BRPOPLPUSH 可以在处理完成之后再从备份队列删除,实现类似 ACK 的功能

PS: RPOPLPUSH: 弹出一个元素,并将该元素压入另一个列表。BRPOPLPUSH: 如果源列表没有元素会阻塞。

redicConn.lpush('queue', 'a')

while True:
    msg = redisConn.rpop('queue')
    if msg is None:
        time.sleep(1)
        continue
    do_something(msg)

2.2 SortedSet

有序集合,不支持重复消息(有时可以当做功能)。

用时间戳(或加一定权重)当分数,据此排序。


2.3 Pub/Sub

实现了简单的 Pub/Sub 功能。
虽然没有 RabbitMQ 那些多 API,也不支持那些复杂的功能配置,但是在简单的场景下用用还是不错的。

  1. 消费者订阅指定主题 subscribe foo
  2. 生产者发布消息 publish foo ooxxooxx

有两个不可忽视的问题:

  1. Pub 上来的消息必须实时发往订阅者,如果订阅者不在线,那么消息就丢失了。
  2. 如果消息积压到一定程度,Redis 会强制断开消费者连接,清空消息队列。

2.4 Streams

Redis 5.0 引入的新数据类型。
这个名字可能是来自流式计算。
所有提交上来的消息按顺序串成一个消息链。

非常值得一提的是,Stream 实现的队列允许重复消费、支持 ACK

  • XADD
  • XDEL
  • XRANGE

三、总结

  1. Redis 不保证数据完整性,所以如果是对数据完整性有要求的话,不能使用这个方案。
  2. Redis 不支持消息确认机制(Ack/Nack/Reject),需要自己在应用层面实现,但如果这么做的话,不如干脆用 MQ 算了。
  3. 除了 Pub/Sub 之外,ListSortedSetStreams 都支持消息持久化,但是应用时需要注意避免消息堆积,否则会可能会对内存造成压力。

如何保证消息的完整性呢?

参考资料与拓展阅读

#1 RabbitMQ & AMQP

2014-07-11

AMQP

Advanced Message Queuing Protocol, 高级消息队列协议

久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。

其他常见的 MQ 协议还有:STOMP 1, MQTT 2,有时也会和 XMPP 3 做对比。

  • 2006/06 版本 0-8
  • 2006/12 版本 0-9
  • 2008/11 版本 0-9-1 RabbitMQ实现
  • 2011/10 版本 1.0
  • AMQP 移交给 OASIS 组织之后发布的第一个版本
  • 2014/04 该版本成为 ISO 国际标准。

基本概念

AMQP Arch

  • Message
  • DeliveryTag 这个标记十分重要
  • Properties
  • Header
  • Body
  • Content Type
  • Content Encoding
  • Message Queue
  • Message Broker
  • Message-Oriented Middleware 消息中间件,有时简写 MOM
  • Connection TCP 连接, 服务器永远不会主动关闭连接
  • Channel 通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)
  • 通道的打开关闭
  • 多线程可以使用各自的通道
  • Server 消息队列服务器,就是指 Broker
  • Virtual Host 虚拟主机,消息队列中的逻辑隔离
  • Publisher 消息生产者
  • Exchange 交换机
  • Queue 队列
  • Binding 绑定,指定了队列和交换机之间的关系
  • RoutingKey 路由键,Binding 的附加参数,对消息进行过滤
    注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKey
  • Comsumer 消息消费者

基本流程

AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:

  1. 打开(连接)open
  2. 开始(会话)begin
  3. 附加(链路)attach
  4. 传输 transfer
  5. 流量控制 flow
  6. 状态通信 disposition
  7. 分离(链路)detach
  8. 结束(会话)end
  9. 关闭(连接)close

连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。

+ OpenConnection
|   + StartSession
|   |   + AttachLink
|   |   |     Transfer
|   |   |     Flow
|   |   |     Disposition
|   |   + DetachLink
|   + EndSession
+ CloseConnection
  1. 定义 Exchange、Queue、Binding
  2. 生产者将消息投递给 Exchange
  3. Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
  4. 消费者从 Queue 中拿到消息

RabbitMQ

实现了 AMQP 0-9-1。

端口

ps -ef | grep rabbitmq | grep -v grep
rabbitmq  966640       1  5 11:09 ?        00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa  -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq  966744  966640  0 11:09 ?        00:00:00 erl_child_setup 65536
rabbitmq  966775       1  0 11:09 ?        00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq  966802  966744  0 11:09 ?        00:00:00 inet_gethost 4
rabbitmq  966803  966802  0 11:09 ?        00:00:00 inet_gethost 4

sudo nmap -p 1-65535 localhost
4369/tcp  open  epmd
5672/tcp  open  amqp
15672/tcp open  unknown
25672/tcp open  unknown
  • 4369 Erlang 端口映射器守护程序 (epmd)
  • 5672 服务端口
  • 15672 Web 接口
  • 25672 不知道干嘛的,来自 inet_dist_listen_min - inet_dist_listen_max, 我本机默认配置

Exchange 类型

AMQP 中定义的 4 种 Exchange 类型:

  1. direct 需要符合 RoutingKey 的完全匹配
  2. topic 支持模糊匹配:# 代表任意个单词,* 代表一个单词
  3. RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
  4. BindingKey 可以包含上面说的模糊匹配字符。
  5. fanout 将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKey
  6. headers 采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey
  7. Binding 中 x- 开头的属性不会参与匹配。
  8. Binding 可以定义 x-match 属性,any (默认值) 表示匹配中一个字段就行,all 表示所有字段都匹配。
  9. 奇怪的是官网教程中没有这种类型的示例。

预置 Exchange:

  • (AMQP default) direct
  • amq.direct direct
  • amq.fanout fanout
  • amq.headers headers
  • amq.match headers
  • amq.rabbitmq.log topic
  • amq.rabbitmq.trace topic
  • amq.topic topic

默认 Exchange (AMQP default) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。

确认机制

  1. 来自 TCP 的启发。
  2. 分成消费者确认和生产者确认两部分。
  3. DeliveryTag 对于确认机制至关重要。

具体下来就是:

  • Basic.Ack basic_ack(delivery_tag=0, multiple=False) 可以一次确认多个消息
  • Basic.Nack basic_nack(delivery_tag=None, multiple=False, requeue=True)
  • Basic.Reject basic.reject(delivery_tag=None, requeue=True)

Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。

关于自动确认模式

命令

Connection

  1. Connection.Start
  2. Connection.StartOk
  3. Connection.Secure
  4. Connection.SecureOk
  5. Connection.Tune
  6. Connection.TuneOk
  7. Connection.Open
  8. Connection.OpenOk
  9. Connection.Close
  10. Connection.CloseOk
  11. Connection.Blocked
  12. Connection.Unblocked

Channel

  1. Channel.Open
  2. Channel.OpenOk
  3. Channel.Flow
  4. Channel.FlowOk
  5. Channel.Close
  6. Channel.CloseOk

Access

  1. Access.Request
  2. Access.RequestOk

Exchange 交换器

  1. Exchange.Declare
  2. Exchange.DeclareOk
  3. Exchange.Delete
  4. Exchange.DeleteOk
  5. Exchange.Bind 交换器绑定 (RabbitMQ 拓展)
  6. Exchange.BindOk
  7. Exchange.Unbind 交换器解绑 (RabbitMQ 拓展)
  8. Exchange.UnbindOk

Queue 队列

  1. Queue.Declare 队列声明
  2. Queue.DeclareOk
  3. Queue.Bind 队列绑定 (到交换器)
  4. Queue.BindOk
  5. Queue.Purge 队列清空 (没分配的,也就是 unack 不会被清空)
  6. Queue.PurgeOk
  7. Queue.Delete 队列删除
  8. Queue.DeleteOk
  9. Queue.Unbind 队列解绑
  10. Queue.UnbindOk

Basic

  1. Basic.Qos Quality of Service 设置:
  2. prefetch_size
  3. prefetch_count
  4. Basic.QosOk
  5. Basic.Consume 消费者开始消费
  6. Basic.ConsumeOk
  7. Basic.Cancel 取消消费者订阅
  8. Basic.CancelOk
  9. Basic.Publish 发布消息
  10. Basic.Return
  11. Basic.Deliver
  12. Basic.Get 直接从指定队列获取消息
  13. Basic.GetOk
  14. Basic.GetEmpty
  15. Basic.Ack 确认
  16. Basic.Reject
  17. Basic.RecoverAsync
  18. Basic.Recover
  19. Basic.RecoverOk
  20. Basic.Nack 负确认 (RabbitMQ 拓展)

Tx 事务

  1. Tx.Select
  2. Tx.SelectOk
  3. Tx.Commit
  4. Tx.CommitOk
  5. Tx.Rollback
  6. Tx.RollbackOk

Confirm

  1. Confirm.Select
  2. Confirm.SelectOk

话题 1:持久化

  1. 交换机持久化 durable=true
  2. 队列持久化 durable=true
  3. 绑定持久化
  4. 消息持久化 properties=pika.BasicProperties(delivery_mode = 2)
    basic_publish(exchange, routing_key, body, properties=None,
                  mandatory=False, immediate=False)
    

话题 2:可靠性的保障

  1. 事务
  2. 持久化
  3. 确认模式

参考资料与拓展阅读


  1. Streaming Text Oriented Messaging Protocol 

  2. 曾经是 Message Queuing Telemetry Transport 的简称, 后来不代表任何意义了
    img 

  3. Extensible Messaging and Presence Protocol, 一种通讯协议