AMQP
Advanced Message Queuing Protocol, 高级消息队列协议
久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。
其他常见的 MQ 协议还有:STOMP [^stomp], MQTT [^mqtt],有时也会和 XMPP [^xmpp] 做对比。
- 2006/06 版本 0-8
- 2006/12 版本 0-9
- 2008/11 版本 0-9-1 RabbitMQ 实现
- 2011/10 版本 1.0
- AMQP 移交给 OASIS 组织之后发布的第一个版本
- 2014/04 该版本成为 ISO 国际标准。
基本概念

MessageDeliveryTag这个标记十分重要PropertiesHeaderBodyContent TypeContent Encoding
Message QueueMessage BrokerMessage-Oriented Middleware消息中间件,有时简写 MOMConnectionTCP 连接, 服务器永远不会主动关闭连接Channel通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)- 通道的打开关闭
- 多线程可以使用各自的通道
Server消息队列服务器,就是指 BrokerVirtual Host虚拟主机,消息队列中的逻辑隔离Publisher消息生产者Exchange交换机Queue队列Binding绑定,指定了队列和交换机之间的关系RoutingKey路由键,Binding 的附加参数,对消息进行过滤
注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKeyComsumer消息消费者
基本流程
AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:
- 打开(连接)open
- 开始(会话)begin
- 附加(链路)attach
- 传输 transfer
- 流量控制 flow
- 状态通信 disposition
- 分离(链路)detach
- 结束(会话)end
- 关闭(连接)close
连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。
+ OpenConnection
| + StartSession
| | + AttachLink
| | | Transfer
| | | Flow
| | | Disposition
| | + DetachLink
| + EndSession
+ CloseConnection
- 定义 Exchange、Queue、Binding
- 生产者将消息投递给 Exchange
- Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
- 消费者从 Queue 中拿到消息
RabbitMQ
实现了 AMQP 0-9-1。
端口
ps -ef | grep rabbitmq | grep -v grep
rabbitmq 966640 1 5 11:09 ? 00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq 966744 966640 0 11:09 ? 00:00:00 erl_child_setup 65536
rabbitmq 966775 1 0 11:09 ? 00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq 966802 966744 0 11:09 ? 00:00:00 inet_gethost 4
rabbitmq 966803 966802 0 11:09 ? 00:00:00 inet_gethost 4
sudo nmap -p 1-65535 localhost
4369/tcp open epmd
5672/tcp open amqp
15672/tcp open unknown
25672/tcp open unknown
4369Erlang 端口映射器守护程序 (epmd)5672服务端口15672Web 接口25672不知道干嘛的,来自inet_dist_listen_min-inet_dist_listen_max, 我本机默认配置
Exchange 与队列
type exchangeDeclare struct {
reserved1 uint16
Exchange string
Type string
Passive bool
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Arguments Table
}
type queueDeclare struct {
reserved1 uint16
Queue string
Passive bool
Durable bool
Exclusive bool
AutoDelete bool
NoWait bool
Arguments Table
}
type queueBind struct {
reserved1 uint16
Queue string
Exchange string
RoutingKey string
NoWait bool
Arguments Table
}
Exchange 类型
AMQP 中定义的 4 种 Exchange 类型:
direct需要符合RoutingKey的完全匹配topic支持模糊匹配:#代表任意个单词,*代表一个单词- RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
- BindingKey 可以包含上面说的模糊匹配字符。
fanout将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKeyheaders采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey- Binding 中
x-开头的属性不会参与匹配。 - Binding 可以定义
x-match属性,any(默认值) 表示匹配中一个字段就行,all表示所有字段都匹配。 - 奇怪的是官网教程中没有这种类型的示例。
- Binding 中
预置 Exchange:
(AMQP default)directamq.directdirectamq.fanoutfanoutamq.headersheadersamq.matchheadersamq.rabbitmq.logtopicamq.rabbitmq.tracetopicamq.topictopic
默认 Exchange (
AMQP default) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。
确认机制
- 来自 TCP 的启发。
- 分成消费者确认和生产者确认两部分。
- DeliveryTag 对于确认机制至关重要。
具体下来就是:
- Basic.Ack
basic_ack(delivery_tag=0, multiple=False)可以一次确认多个消息 - Basic.Nack
basic_nack(delivery_tag=None, multiple=False, requeue=True) - Basic.Reject
basic.reject(delivery_tag=None, requeue=True)
Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 作用相同,唯一不同的是,Nack 可以一次拒绝该通道之前(小于 delivery_tag)所有没有 ACK 的消息。
关于自动确认模式
命令
Connection
Connection.StartConnection.StartOkConnection.SecureConnection.SecureOkConnection.TuneConnection.TuneOkConnection.OpenConnection.OpenOkConnection.CloseConnection.CloseOkConnection.BlockedConnection.Unblocked
Channel
Channel.OpenChannel.OpenOkChannel.FlowChannel.FlowOkChannel.CloseChannel.CloseOk
Access
Access.RequestAccess.RequestOk
Exchange 交换器
Exchange.DeclareExchange.DeclareOkExchange.DeleteExchange.DeleteOkExchange.Bind交换器绑定 (RabbitMQ 拓展)Exchange.BindOkExchange.Unbind交换器解绑 (RabbitMQ 拓展)Exchange.UnbindOk
Queue 队列
Queue.Declare队列声明Queue.DeclareOkQueue.Bind队列绑定 (到交换器)Queue.BindOkQueue.Purge队列清空 (没分配的,也就是 unack 不会被清空)Queue.PurgeOkQueue.Delete队列删除Queue.DeleteOkQueue.Unbind队列解绑Queue.UnbindOk
Basic
Basic.QosQuality of Service 设置:prefetch_sizeprefetch_count
Basic.QosOkBasic.Consume消费者开始消费Basic.ConsumeOkBasic.Cancel取消消费者订阅Basic.CancelOkBasic.Publish发布消息Basic.ReturnBasic.DeliverBasic.Get直接从指定队列获取消息Basic.GetOkBasic.GetEmptyBasic.Ack确认Basic.RejectBasic.RecoverAsyncBasic.RecoverBasic.RecoverOkBasic.Nack负确认 (RabbitMQ 拓展)
Tx 事务
Tx.SelectTx.SelectOkTx.CommitTx.CommitOkTx.RollbackTx.RollbackOk
Confirm
Confirm.SelectConfirm.SelectOk
消息格式
来自 AMQP 协议文件 amqp-0-9-1.xml 中 Basic 的字段定义:
<!-- These are the properties for a Basic content -->
<!-- MIME typing -->
<field name = "content-type" domain = "shortstr" label = "MIME content type" />
<!-- MIME typing -->
<field name = "content-encoding" domain = "shortstr" label = "MIME content encoding" />
<!-- For applications, and for header exchange routing -->
<field name = "headers" domain = "table" label = "message header field table" />
<!-- For queues that implement persistence -->
<field name = "delivery-mode" domain = "octet" label = "non-persistent (1) or persistent (2)" />
<!-- For queues that implement priorities -->
<field name = "priority" domain = "octet" label = "message priority, 0 to 9" />
<!-- For application use, no formal behaviour -->
<field name = "correlation-id" domain = "shortstr" label = "application correlation identifier" />
<!-- For application use, no formal behaviour but may hold the
name of a private response queue, when used in request messages -->
<field name = "reply-to" domain = "shortstr" label = "address to reply to" />
<!-- For implementation use, no formal behaviour -->
<field name = "expiration" domain = "shortstr" label = "message expiration specification" />
<!-- For application use, no formal behaviour -->
<field name = "message-id" domain = "shortstr" label = "application message identifier" />
<!-- For application use, no formal behaviour -->
<field name = "timestamp" domain = "timestamp" label = "message timestamp" />
<!-- For application use, no formal behaviour -->
<field name = "type" domain = "shortstr" label = "message type name" />
<!-- For application use, no formal behaviour -->
<field name = "user-id" domain = "shortstr" label = "creating user id" />
<!-- For application use, no formal behaviour -->
<field name = "app-id" domain = "shortstr" label = "creating application id" />
<!-- Deprecated, was old cluster-id property -->
<field name = "reserved" domain = "shortstr" label = "reserved, must be empty" />
来自 rabbitmq/amqp091-go@v1.10.0/types.go 中的消息类型定义:
// Publishing captures the client message sent to the server. The fields
// outside of the Headers table included in this struct mirror the underlying
// fields in the content frame. They use native types for convenience and
// efficiency.
type Publishing struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers Table
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
// Expiration represents the message TTL in milliseconds. A value of "0"
// indicates that the message will immediately expire if the message arrives
// at its destination and the message is not directly handled by a consumer
// that currently has the capacatity to do so. If you wish the message to
// not expire on its own, set this value to any ttl value, empty string or
// use the corresponding constants NeverExpire and ImmediatelyExpire. This
// does not influence queue configured TTL values.
Expiration string
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
// The application specific payload of the message
Body []byte
}