#1 RabbitMQ & AMQP
MQ RabbitMQ AMQP 2014-07-11AMQP
Advanced Message Queuing Protocol, 高级消息队列协议
久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。
其他常见的 MQ 协议还有:STOMP 1, MQTT 2,有时也会和 XMPP 3 做对比。
- 2006/06 版本 0-8
- 2006/12 版本 0-9
- 2008/11 版本 0-9-1 RabbitMQ实现
- 2011/10 版本 1.0
- AMQP 移交给 OASIS 组织之后发布的第一个版本
- 2014/04 该版本成为 ISO 国际标准。
基本概念
Message
DeliveryTag
这个标记十分重要Properties
Header
Body
Content Type
Content Encoding
Message Queue
Message Broker
Message-Oriented Middleware
消息中间件,有时简写 MOMConnection
TCP 连接, 服务器永远不会主动关闭连接Channel
通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)- 通道的打开关闭
- 多线程可以使用各自的通道
Server
消息队列服务器,就是指 BrokerVirtual Host
虚拟主机,消息队列中的逻辑隔离Publisher
消息生产者Exchange
交换机Queue
队列Binding
绑定,指定了队列和交换机之间的关系RoutingKey
路由键,Binding 的附加参数,对消息进行过滤
注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKeyComsumer
消息消费者
基本流程
AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:
- 打开(连接)open
- 开始(会话)begin
- 附加(链路)attach
- 传输 transfer
- 流量控制 flow
- 状态通信 disposition
- 分离(链路)detach
- 结束(会话)end
- 关闭(连接)close
连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。
+ OpenConnection
| + StartSession
| | + AttachLink
| | | Transfer
| | | Flow
| | | Disposition
| | + DetachLink
| + EndSession
+ CloseConnection
- 定义 Exchange、Queue、Binding
- 生产者将消息投递给 Exchange
- Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
- 消费者从 Queue 中拿到消息
RabbitMQ
实现了 AMQP 0-9-1。
端口
ps -ef | grep rabbitmq | grep -v grep
rabbitmq 966640 1 5 11:09 ? 00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq 966744 966640 0 11:09 ? 00:00:00 erl_child_setup 65536
rabbitmq 966775 1 0 11:09 ? 00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq 966802 966744 0 11:09 ? 00:00:00 inet_gethost 4
rabbitmq 966803 966802 0 11:09 ? 00:00:00 inet_gethost 4
sudo nmap -p 1-65535 localhost
4369/tcp open epmd
5672/tcp open amqp
15672/tcp open unknown
25672/tcp open unknown
4369
Erlang 端口映射器守护程序 (epmd)5672
服务端口15672
Web 接口25672
不知道干嘛的,来自inet_dist_listen_min
-inet_dist_listen_max
, 我本机默认配置
Exchange 类型
AMQP 中定义的 4 种 Exchange 类型:
direct
需要符合RoutingKey
的完全匹配topic
支持模糊匹配:#
代表任意个单词,*
代表一个单词- RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
- BindingKey 可以包含上面说的模糊匹配字符。
fanout
将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKeyheaders
采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey- Binding 中
x-
开头的属性不会参与匹配。 - Binding 可以定义
x-match
属性,any
(默认值) 表示匹配中一个字段就行,all
表示所有字段都匹配。 - 奇怪的是官网教程中没有这种类型的示例。
预置 Exchange:
(AMQP default)
directamq.direct
directamq.fanout
fanoutamq.headers
headersamq.match
headersamq.rabbitmq.log
topicamq.rabbitmq.trace
topicamq.topic
topic
默认 Exchange (
AMQP default
) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。
确认机制
- 来自 TCP 的启发。
- 分成消费者确认和生产者确认两部分。
- DeliveryTag 对于确认机制至关重要。
具体下来就是:
- Basic.Ack
basic_ack(delivery_tag=0, multiple=False)
可以一次确认多个消息 - Basic.Nack
basic_nack(delivery_tag=None, multiple=False, requeue=True)
- Basic.Reject
basic.reject(delivery_tag=None, requeue=True)
Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。
关于自动确认模式
命令
Connection
Connection.Start
Connection.StartOk
Connection.Secure
Connection.SecureOk
Connection.Tune
Connection.TuneOk
Connection.Open
Connection.OpenOk
Connection.Close
Connection.CloseOk
Connection.Blocked
Connection.Unblocked
Channel
Channel.Open
Channel.OpenOk
Channel.Flow
Channel.FlowOk
Channel.Close
Channel.CloseOk
Access
Access.Request
Access.RequestOk
Exchange 交换器
Exchange.Declare
Exchange.DeclareOk
Exchange.Delete
Exchange.DeleteOk
Exchange.Bind
交换器绑定 (RabbitMQ 拓展)Exchange.BindOk
Exchange.Unbind
交换器解绑 (RabbitMQ 拓展)Exchange.UnbindOk
Queue 队列
Queue.Declare
队列声明Queue.DeclareOk
Queue.Bind
队列绑定 (到交换器)Queue.BindOk
Queue.Purge
队列清空 (没分配的,也就是 unack 不会被清空)Queue.PurgeOk
Queue.Delete
队列删除Queue.DeleteOk
Queue.Unbind
队列解绑Queue.UnbindOk
Basic
Basic.Qos
Quality of Service 设置:prefetch_size
prefetch_count
Basic.QosOk
Basic.Consume
消费者开始消费Basic.ConsumeOk
Basic.Cancel
取消消费者订阅Basic.CancelOk
Basic.Publish
发布消息Basic.Return
Basic.Deliver
Basic.Get
直接从指定队列获取消息Basic.GetOk
Basic.GetEmpty
Basic.Ack
确认Basic.Reject
Basic.RecoverAsync
Basic.Recover
Basic.RecoverOk
Basic.Nack
负确认 (RabbitMQ 拓展)
Tx 事务
Tx.Select
Tx.SelectOk
Tx.Commit
Tx.CommitOk
Tx.Rollback
Tx.RollbackOk
Confirm
Confirm.Select
Confirm.SelectOk
话题 1:持久化
- 交换机持久化
durable=true
- 队列持久化
durable=true
- 绑定持久化
- 消息持久化
properties=pika.BasicProperties(delivery_mode = 2)
basic_publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)
话题 2:可靠性的保障
- 事务
- 持久化
- 确认模式
参考资料与拓展阅读
- RabbitMQ, RabbitMQ Tutorials
- RabbitMQ, AMQP 0-9-1 Model Explained
- RabbitMQ, AMQP 0-9-1 Quick Reference
- RabbitMQ, Compatibility and Conformance 兼容性和一致性
- RabbitMQ, AMQP 0-9-1 Specification (pdf)
- RabbitMQ, Consumer Acknowledgements and Publisher Confirms
- GitHub, pika/pika at 0.9.14
- GitHub, pika/channel.py at 0.9.14 · pika/pika
- 维基百科(en),Advanced Message Queuing Protocol
- 维基百科(en),RabbitMQ
- Read the Docs, pika