#2 Golang RabbitMQ

2021-06-18

RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。

说明:采用 github.com/streadway/amqp 库。

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

生产者

生产者基本流程

生产者:连接

  1. amqp.Dial -> amqp.Connection
  2. amqp.Connection.Channel -> amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
    log.Fatalf("Failed to open a channel: %s", err)
}

生产者:配置(可选)

事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。

  1. amqp.Channel.QueueDeclare
  2. amqp.Channel.ExchangeDeclare
  3. amqp.Channel.QueueBind
q, err := ch.QueueDeclare(
    "hello", // 队列名称
    true,    // 持久化
    false,   // 自动删除
    false,   // 独占
    false,   // 等待服务器回应
    nil,     // 额外参数
)
if err != nil {
    log.Fatalf("Failed to declare a queue: %s", err)
}

生产者:发送

  1. amqp.Channel.Publish
err = ch.Publish(
    "",     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("hello world"),
    })
if err != nil {
    log.Fatalf("Failed to publish a message: %s", err)
}

生产者:收尾

  1. amqp.Connection.Close
  2. amqp.Channel.Close

消费者

消费者基本流程

和生产者基本一致。只是调用的的是 chan.Consume 而不是 chan.Publish
然后就是配置阶段,消费者只用关心队列在不在。

参考资料与拓展阅读

#1 RabbitMQ & AMQP

2014-07-11

AMQP

Advanced Message Queuing Protocol, 高级消息队列协议

久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。

其他常见的 MQ 协议还有:STOMP 1, MQTT 2,有时也会和 XMPP 3 做对比。

  • 2006/06 版本 0-8
  • 2006/12 版本 0-9
  • 2008/11 版本 0-9-1 RabbitMQ实现
  • 2011/10 版本 1.0
  • AMQP 移交给 OASIS 组织之后发布的第一个版本
  • 2014/04 该版本成为 ISO 国际标准。

基本概念

AMQP Arch

  • Message
  • DeliveryTag 这个标记十分重要
  • Properties
  • Header
  • Body
  • Content Type
  • Content Encoding
  • Message Queue
  • Message Broker
  • Message-Oriented Middleware 消息中间件,有时简写 MOM
  • Connection TCP 连接, 服务器永远不会主动关闭连接
  • Channel 通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)
  • 通道的打开关闭
  • 多线程可以使用各自的通道
  • Server 消息队列服务器,就是指 Broker
  • Virtual Host 虚拟主机,消息队列中的逻辑隔离
  • Publisher 消息生产者
  • Exchange 交换机
  • Queue 队列
  • Binding 绑定,指定了队列和交换机之间的关系
  • RoutingKey 路由键,Binding 的附加参数,对消息进行过滤
    注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKey
  • Comsumer 消息消费者

基本流程

AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:

  1. 打开(连接)open
  2. 开始(会话)begin
  3. 附加(链路)attach
  4. 传输 transfer
  5. 流量控制 flow
  6. 状态通信 disposition
  7. 分离(链路)detach
  8. 结束(会话)end
  9. 关闭(连接)close

连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。

+ OpenConnection
|   + StartSession
|   |   + AttachLink
|   |   |     Transfer
|   |   |     Flow
|   |   |     Disposition
|   |   + DetachLink
|   + EndSession
+ CloseConnection
  1. 定义 Exchange、Queue、Binding
  2. 生产者将消息投递给 Exchange
  3. Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
  4. 消费者从 Queue 中拿到消息

RabbitMQ

实现了 AMQP 0-9-1。

端口

ps -ef | grep rabbitmq | grep -v grep
rabbitmq  966640       1  5 11:09 ?        00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa  -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq  966744  966640  0 11:09 ?        00:00:00 erl_child_setup 65536
rabbitmq  966775       1  0 11:09 ?        00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq  966802  966744  0 11:09 ?        00:00:00 inet_gethost 4
rabbitmq  966803  966802  0 11:09 ?        00:00:00 inet_gethost 4

sudo nmap -p 1-65535 localhost
4369/tcp  open  epmd
5672/tcp  open  amqp
15672/tcp open  unknown
25672/tcp open  unknown
  • 4369 Erlang 端口映射器守护程序 (epmd)
  • 5672 服务端口
  • 15672 Web 接口
  • 25672 不知道干嘛的,来自 inet_dist_listen_min - inet_dist_listen_max, 我本机默认配置

Exchange 类型

AMQP 中定义的 4 种 Exchange 类型:

  1. direct 需要符合 RoutingKey 的完全匹配
  2. topic 支持模糊匹配:# 代表任意个单词,* 代表一个单词
  3. RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
  4. BindingKey 可以包含上面说的模糊匹配字符。
  5. fanout 将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKey
  6. headers 采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey
  7. Binding 中 x- 开头的属性不会参与匹配。
  8. Binding 可以定义 x-match 属性,any (默认值) 表示匹配中一个字段就行,all 表示所有字段都匹配。
  9. 奇怪的是官网教程中没有这种类型的示例。

预置 Exchange:

  • (AMQP default) direct
  • amq.direct direct
  • amq.fanout fanout
  • amq.headers headers
  • amq.match headers
  • amq.rabbitmq.log topic
  • amq.rabbitmq.trace topic
  • amq.topic topic

默认 Exchange (AMQP default) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。

确认机制

  1. 来自 TCP 的启发。
  2. 分成消费者确认和生产者确认两部分。
  3. DeliveryTag 对于确认机制至关重要。

具体下来就是:

  • Basic.Ack basic_ack(delivery_tag=0, multiple=False) 可以一次确认多个消息
  • Basic.Nack basic_nack(delivery_tag=None, multiple=False, requeue=True)
  • Basic.Reject basic.reject(delivery_tag=None, requeue=True)

Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。

关于自动确认模式

命令

Connection

  1. Connection.Start
  2. Connection.StartOk
  3. Connection.Secure
  4. Connection.SecureOk
  5. Connection.Tune
  6. Connection.TuneOk
  7. Connection.Open
  8. Connection.OpenOk
  9. Connection.Close
  10. Connection.CloseOk
  11. Connection.Blocked
  12. Connection.Unblocked

Channel

  1. Channel.Open
  2. Channel.OpenOk
  3. Channel.Flow
  4. Channel.FlowOk
  5. Channel.Close
  6. Channel.CloseOk

Access

  1. Access.Request
  2. Access.RequestOk

Exchange 交换器

  1. Exchange.Declare
  2. Exchange.DeclareOk
  3. Exchange.Delete
  4. Exchange.DeleteOk
  5. Exchange.Bind 交换器绑定 (RabbitMQ 拓展)
  6. Exchange.BindOk
  7. Exchange.Unbind 交换器解绑 (RabbitMQ 拓展)
  8. Exchange.UnbindOk

Queue 队列

  1. Queue.Declare 队列声明
  2. Queue.DeclareOk
  3. Queue.Bind 队列绑定 (到交换器)
  4. Queue.BindOk
  5. Queue.Purge 队列清空 (没分配的,也就是 unack 不会被清空)
  6. Queue.PurgeOk
  7. Queue.Delete 队列删除
  8. Queue.DeleteOk
  9. Queue.Unbind 队列解绑
  10. Queue.UnbindOk

Basic

  1. Basic.Qos Quality of Service 设置:
  2. prefetch_size
  3. prefetch_count
  4. Basic.QosOk
  5. Basic.Consume 消费者开始消费
  6. Basic.ConsumeOk
  7. Basic.Cancel 取消消费者订阅
  8. Basic.CancelOk
  9. Basic.Publish 发布消息
  10. Basic.Return
  11. Basic.Deliver
  12. Basic.Get 直接从指定队列获取消息
  13. Basic.GetOk
  14. Basic.GetEmpty
  15. Basic.Ack 确认
  16. Basic.Reject
  17. Basic.RecoverAsync
  18. Basic.Recover
  19. Basic.RecoverOk
  20. Basic.Nack 负确认 (RabbitMQ 拓展)

Tx 事务

  1. Tx.Select
  2. Tx.SelectOk
  3. Tx.Commit
  4. Tx.CommitOk
  5. Tx.Rollback
  6. Tx.RollbackOk

Confirm

  1. Confirm.Select
  2. Confirm.SelectOk

话题 1:持久化

  1. 交换机持久化 durable=true
  2. 队列持久化 durable=true
  3. 绑定持久化
  4. 消息持久化 properties=pika.BasicProperties(delivery_mode = 2)
    basic_publish(exchange, routing_key, body, properties=None,
                  mandatory=False, immediate=False)
    

话题 2:可靠性的保障

  1. 事务
  2. 持久化
  3. 确认模式

参考资料与拓展阅读


  1. Streaming Text Oriented Messaging Protocol 

  2. 曾经是 Message Queuing Telemetry Transport 的简称, 后来不代表任何意义了
    img 

  3. Extensible Messaging and Presence Protocol, 一种通讯协议