Golang RabbitMQ MQ
2021-06-18
- 2014/07/11, RabbitMQ & AMQP
- 2015/02/12, Python RabbitMQ
- 2015/04/01, RabbitMQ 管理命令
- 2015/05/04, pika 01: 基础
- 2015/05/04, pika 02: BlockingConnection
- 2015/05/05, pika 03: SelectConnection
- 2015/05/06, pika 04: TornadoConnection
- 2016/02/12, Java RabbitMQ
- 2018/01/31, RabbitMQ 集群
- 2018/04/01, 面试题:RabbitMQ 相关
- 2018/10/02, pika: AsyncioConnection
- 2018/10/06, 使用 RabbitMQ 遇到的一个小问题
- 2019/01/12, 延迟队列
- 2020/10/03, 常见的消息队列
- 2021/06/18, Golang RabbitMQ
RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。
说明:采用 github.com/streadway/amqp
库。
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
生产者
生产者基本流程
生产者:连接
amqp.Dial
-> amqp.Connection
amqp.Connection.Channel
-> amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
生产者:配置(可选)
事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。
amqp.Channel.QueueDeclare
amqp.Channel.ExchangeDeclare
amqp.Channel.QueueBind
q, err := ch.QueueDeclare(
"hello", // 队列名称
true, // 持久化
false, // 自动删除
false, // 独占
false, // 等待服务器回应
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
生产者:发送
amqp.Channel.Publish
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello world"),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
生产者:收尾
amqp.Connection.Close
amqp.Channel.Close
消费者
消费者基本流程
和生产者基本一致。只是调用的的是 chan.Consume
而不是 chan.Publish
。
然后就是配置阶段,消费者只用关心队列在不在。
参考资料与拓展阅读
MQ RabbitMQ AMQP
2014-07-11
AMQP
Advanced Message Queuing Protocol, 高级消息队列协议
久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。
其他常见的 MQ 协议还有:STOMP , MQTT ,有时也会和 XMPP 做对比。
- 2006/06 版本 0-8
- 2006/12 版本 0-9
- 2008/11 版本 0-9-1 RabbitMQ实现
- 2011/10 版本 1.0
- AMQP 移交给 OASIS 组织之后发布的第一个版本
- 2014/04 该版本成为 ISO 国际标准。
基本概念
Message
DeliveryTag
这个标记十分重要
Properties
Header
Body
Content Type
Content Encoding
Message Queue
Message Broker
Message-Oriented Middleware
消息中间件,有时简写 MOM
Connection
TCP 连接, 服务器永远不会主动关闭连接
Channel
通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)
- 通道的打开关闭
- 多线程可以使用各自的通道
Server
消息队列服务器,就是指 Broker
Virtual Host
虚拟主机,消息队列中的逻辑隔离
Publisher
消息生产者
Exchange
交换机
Queue
队列
Binding
绑定,指定了队列和交换机之间的关系
RoutingKey
路由键,Binding 的附加参数,对消息进行过滤
注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKey
Comsumer
消息消费者
基本流程
AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:
- 打开(连接)open
- 开始(会话)begin
- 附加(链路)attach
- 传输 transfer
- 流量控制 flow
- 状态通信 disposition
- 分离(链路)detach
- 结束(会话)end
- 关闭(连接)close
连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。
+ OpenConnection
| + StartSession
| | + AttachLink
| | | Transfer
| | | Flow
| | | Disposition
| | + DetachLink
| + EndSession
+ CloseConnection
- 定义 Exchange、Queue、Binding
- 生产者将消息投递给 Exchange
- Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
- 消费者从 Queue 中拿到消息
RabbitMQ
实现了 AMQP 0-9-1。
端口
ps -ef | grep rabbitmq | grep -v grep
rabbitmq 966640 1 5 11:09 ? 00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq 966744 966640 0 11:09 ? 00:00:00 erl_child_setup 65536
rabbitmq 966775 1 0 11:09 ? 00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq 966802 966744 0 11:09 ? 00:00:00 inet_gethost 4
rabbitmq 966803 966802 0 11:09 ? 00:00:00 inet_gethost 4
sudo nmap -p 1-65535 localhost
4369/tcp open epmd
5672/tcp open amqp
15672/tcp open unknown
25672/tcp open unknown
4369
Erlang 端口映射器守护程序 (epmd)
5672
服务端口
15672
Web 接口
25672
不知道干嘛的,来自 inet_dist_listen_min
- inet_dist_listen_max
, 我本机默认配置
Exchange 类型
AMQP 中定义的 4 种 Exchange 类型:
direct
需要符合 RoutingKey
的完全匹配
topic
支持模糊匹配:#
代表任意个单词,*
代表一个单词
- RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
- BindingKey 可以包含上面说的模糊匹配字符。
fanout
将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKey
headers
采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey
- Binding 中
x-
开头的属性不会参与匹配。
- Binding 可以定义
x-match
属性,any
(默认值) 表示匹配中一个字段就行,all
表示所有字段都匹配。
- 奇怪的是官网教程中没有这种类型的示例。
预置 Exchange:
(AMQP default)
direct
amq.direct
direct
amq.fanout
fanout
amq.headers
headers
amq.match
headers
amq.rabbitmq.log
topic
amq.rabbitmq.trace
topic
amq.topic
topic
默认 Exchange (AMQP default
) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。
确认机制
- 来自 TCP 的启发。
- 分成消费者确认和生产者确认两部分。
- DeliveryTag 对于确认机制至关重要。
具体下来就是:
- Basic.Ack
basic_ack(delivery_tag=0, multiple=False)
可以一次确认多个消息
- Basic.Nack
basic_nack(delivery_tag=None, multiple=False, requeue=True)
- Basic.Reject
basic.reject(delivery_tag=None, requeue=True)
Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。
关于自动确认模式
命令
Connection
Connection.Start
Connection.StartOk
Connection.Secure
Connection.SecureOk
Connection.Tune
Connection.TuneOk
Connection.Open
Connection.OpenOk
Connection.Close
Connection.CloseOk
Connection.Blocked
Connection.Unblocked
Channel
Channel.Open
Channel.OpenOk
Channel.Flow
Channel.FlowOk
Channel.Close
Channel.CloseOk
Access
Access.Request
Access.RequestOk
Exchange 交换器
Exchange.Declare
Exchange.DeclareOk
Exchange.Delete
Exchange.DeleteOk
Exchange.Bind
交换器绑定 (RabbitMQ 拓展)
Exchange.BindOk
Exchange.Unbind
交换器解绑 (RabbitMQ 拓展)
Exchange.UnbindOk
Queue 队列
Queue.Declare
队列声明
Queue.DeclareOk
Queue.Bind
队列绑定 (到交换器)
Queue.BindOk
Queue.Purge
队列清空 (没分配的,也就是 unack 不会被清空)
Queue.PurgeOk
Queue.Delete
队列删除
Queue.DeleteOk
Queue.Unbind
队列解绑
Queue.UnbindOk
Basic
Basic.Qos
Quality of Service 设置:
prefetch_size
prefetch_count
Basic.QosOk
Basic.Consume
消费者开始消费
Basic.ConsumeOk
Basic.Cancel
取消消费者订阅
Basic.CancelOk
Basic.Publish
发布消息
Basic.Return
Basic.Deliver
Basic.Get
直接从指定队列获取消息
Basic.GetOk
Basic.GetEmpty
Basic.Ack
确认
Basic.Reject
Basic.RecoverAsync
Basic.Recover
Basic.RecoverOk
Basic.Nack
负确认 (RabbitMQ 拓展)
Tx 事务
Tx.Select
Tx.SelectOk
Tx.Commit
Tx.CommitOk
Tx.Rollback
Tx.RollbackOk
Confirm
Confirm.Select
Confirm.SelectOk
话题 1:持久化
- 交换机持久化
durable=true
- 队列持久化
durable=true
- 绑定持久化
- 消息持久化
properties=pika.BasicProperties(delivery_mode = 2)
basic_publish(exchange, routing_key, body, properties=None,
mandatory=False, immediate=False)
话题 2:可靠性的保障
- 事务
- 持久化
- 确认模式
参考资料与拓展阅读