AMQP
Advanced Message Queuing Protocol, 高级消息队列协议
久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。
其他常见的 MQ 协议还有:STOMP 1, MQTT 2,有时也会和 XMPP 3 做对比。
- 2006/06 版本 0-8
- 2006/12 版本 0-9
- 2008/11 版本 0-9-1 RabbitMQ实现
- 2011/10 版本 1.0
- AMQP 移交给 OASIS 组织之后发布的第一个版本
- 2014/04 该版本成为 ISO 国际标准。
基本概念

MessageDeliveryTag这个标记十分重要PropertiesHeaderBodyContent TypeContent EncodingMessage QueueMessage BrokerMessage-Oriented Middleware消息中间件,有时简写 MOMConnectionTCP 连接, 服务器永远不会主动关闭连接Channel通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)- 通道的打开关闭
- 多线程可以使用各自的通道
Server消息队列服务器,就是指 BrokerVirtual Host虚拟主机,消息队列中的逻辑隔离Publisher消息生产者Exchange交换机Queue队列Binding绑定,指定了队列和交换机之间的关系RoutingKey路由键,Binding 的附加参数,对消息进行过滤
注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKeyComsumer消息消费者
基本流程
AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:
- 打开(连接)open
- 开始(会话)begin
- 附加(链路)attach
- 传输 transfer
- 流量控制 flow
- 状态通信 disposition
- 分离(链路)detach
- 结束(会话)end
- 关闭(连接)close
连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。
+ OpenConnection
| + StartSession
| | + AttachLink
| | | Transfer
| | | Flow
| | | Disposition
| | + DetachLink
| + EndSession
+ CloseConnection
- 定义 Exchange、Queue、Binding
- 生产者将消息投递给 Exchange
- Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
- 消费者从 Queue 中拿到消息
RabbitMQ
实现了 AMQP 0-9-1。
端口
ps -ef | grep rabbitmq | grep -v grep
rabbitmq 966640 1 5 11:09 ? 00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq 966744 966640 0 11:09 ? 00:00:00 erl_child_setup 65536
rabbitmq 966775 1 0 11:09 ? 00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq 966802 966744 0 11:09 ? 00:00:00 inet_gethost 4
rabbitmq 966803 966802 0 11:09 ? 00:00:00 inet_gethost 4
sudo nmap -p 1-65535 localhost
4369/tcp open epmd
5672/tcp open amqp
15672/tcp open unknown
25672/tcp open unknown
4369Erlang 端口映射器守护程序 (epmd)5672服务端口15672Web 接口25672不知道干嘛的,来自inet_dist_listen_min-inet_dist_listen_max, 我本机默认配置
Exchange 类型
AMQP 中定义的 4 种 Exchange 类型:
direct需要符合RoutingKey的完全匹配topic支持模糊匹配:#代表任意个单词,*代表一个单词- RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
- BindingKey 可以包含上面说的模糊匹配字符。
fanout将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKeyheaders采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey- Binding 中
x-开头的属性不会参与匹配。 - Binding 可以定义
x-match属性,any(默认值) 表示匹配中一个字段就行,all表示所有字段都匹配。 - 奇怪的是官网教程中没有这种类型的示例。
预置 Exchange:
(AMQP default)directamq.directdirectamq.fanoutfanoutamq.headersheadersamq.matchheadersamq.rabbitmq.logtopicamq.rabbitmq.tracetopicamq.topictopic
默认 Exchange (
AMQP default) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。
确认机制
- 来自 TCP 的启发。
- 分成消费者确认和生产者确认两部分。
- DeliveryTag 对于确认机制至关重要。
具体下来就是:
- Basic.Ack
basic_ack(delivery_tag=0, multiple=False)可以一次确认多个消息 - Basic.Nack
basic_nack(delivery_tag=None, multiple=False, requeue=True) - Basic.Reject
basic.reject(delivery_tag=None, requeue=True)
Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。
关于自动确认模式
命令
Connection
Connection.StartConnection.StartOkConnection.SecureConnection.SecureOkConnection.TuneConnection.TuneOkConnection.OpenConnection.OpenOkConnection.CloseConnection.CloseOkConnection.BlockedConnection.Unblocked
Channel
Channel.OpenChannel.OpenOkChannel.FlowChannel.FlowOkChannel.CloseChannel.CloseOk
Access
Access.RequestAccess.RequestOk
Exchange 交换器
Exchange.DeclareExchange.DeclareOkExchange.DeleteExchange.DeleteOkExchange.Bind交换器绑定 (RabbitMQ 拓展)Exchange.BindOkExchange.Unbind交换器解绑 (RabbitMQ 拓展)Exchange.UnbindOk
Queue 队列
Queue.Declare队列声明Queue.DeclareOkQueue.Bind队列绑定 (到交换器)Queue.BindOkQueue.Purge队列清空 (没分配的,也就是 unack 不会被清空)Queue.PurgeOkQueue.Delete队列删除Queue.DeleteOkQueue.Unbind队列解绑Queue.UnbindOk
Basic
Basic.QosQuality of Service 设置:prefetch_sizeprefetch_countBasic.QosOkBasic.Consume消费者开始消费Basic.ConsumeOkBasic.Cancel取消消费者订阅Basic.CancelOkBasic.Publish发布消息Basic.ReturnBasic.DeliverBasic.Get直接从指定队列获取消息Basic.GetOkBasic.GetEmptyBasic.Ack确认Basic.RejectBasic.RecoverAsyncBasic.RecoverBasic.RecoverOkBasic.Nack负确认 (RabbitMQ 拓展)
Tx 事务
Tx.SelectTx.SelectOkTx.CommitTx.CommitOkTx.RollbackTx.RollbackOk
Confirm
Confirm.SelectConfirm.SelectOk
话题 1:持久化
- 交换机持久化
durable=true - 队列持久化
durable=true - 绑定持久化
- 消息持久化
properties=pika.BasicProperties(delivery_mode = 2)basic_publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)
话题 2:可靠性的保障
- 事务
- 持久化
- 确认模式
参考资料与拓展阅读
- RabbitMQ, RabbitMQ Tutorials
- RabbitMQ, AMQP 0-9-1 Model Explained
- RabbitMQ, AMQP 0-9-1 Quick Reference
- RabbitMQ, Compatibility and Conformance 兼容性和一致性
- RabbitMQ, AMQP 0-9-1 Specification (pdf)
- RabbitMQ, Consumer Acknowledgements and Publisher Confirms
- GitHub, pika/pika at 0.9.14
- GitHub, pika/channel.py at 0.9.14 · pika/pika
- 维基百科(en),Advanced Message Queuing Protocol
- 维基百科(en),RabbitMQ
- Read the Docs, pika