- 2014/07/11, RabbitMQ & AMQP
- 2015/02/12, Python RabbitMQ
- 2015/04/01, RabbitMQ 管理命令
- 2015/05/04, pika 01: 基础
- 2015/05/04, pika 02: BlockingConnection
- 2015/05/05, pika 03: SelectConnection
- 2015/05/06, pika 04: TornadoConnection
- 2016/02/12, Java RabbitMQ
- 2018/01/31, RabbitMQ 集群
- 2018/04/01, 面试题:RabbitMQ 相关
- 2018/10/02, pika: AsyncioConnection
- 2018/10/06, 使用 RabbitMQ 遇到的一个小问题
- 2019/01/12, 延迟队列
- 2020/10/03, 常见的消息队列
- 2021/06/18, Golang RabbitMQ
RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。
说明:采用 github.com/streadway/amqp 库。
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
生产者
生产者基本流程
生产者:连接
amqp.Dial->amqp.Connectionamqp.Connection.Channel->amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
生产者:配置(可选)
事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。
amqp.Channel.QueueDeclareamqp.Channel.ExchangeDeclareamqp.Channel.QueueBind
q, err := ch.QueueDeclare(
"hello", // 队列名称
true, // 持久化
false, // 自动删除
false, // 独占
false, // 等待服务器回应
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
生产者:发送
amqp.Channel.Publish
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello world"),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
生产者:收尾
amqp.Connection.Closeamqp.Channel.Close
消费者
消费者基本流程
和生产者基本一致。只是调用的的是 chan.Consume 而不是 chan.Publish。
然后就是配置阶段,消费者只用关心队列在不在。
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/streadway/amqp"
)
// Consumer RabbitMQ 消费者
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
queue string
}
// NewConsumer 创建消费者
func NewConsumer(conn *amqp.Connection, ch *amqp.Channel, queueName string) (*Consumer, error) {
// 声明队列
_, err = DeclareQueue(ch, queueName)
if err != nil {
return nil, err
}
return &Consumer{
conn: conn,
channel: ch,
queue: queueName,
}, nil
}
// Consume 消费消息
func (c *Consumer) Consume(ctx context.Context, workerID int) error {
// 设置 QoS
err := c.channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
return err
}
// 开始消费
msgs, err := c.channel.Consume(
c.queue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
log.Printf("消费者 %d 已启动,等待消息...", workerID)
for {
select {
case <-ctx.Done():
log.Printf("消费者 %d 停止", workerID)
return nil
case msg, ok := <-msgs:
if !ok {
log.Printf("消费者 %d 消息通道关闭", workerID)
return nil
}
// 处理消息
var message Message
if err := json.Unmarshal(msg.Body, &message); err != nil {
log.Printf("解析消息失败: %v", err)
msg.Nack(false, false) // 拒绝消息,不重新入队
continue
}
// 模拟处理时间
log.Printf("消费者 %d 处理消息: %+v", workerID, message)
time.Sleep(1 * time.Second)
// 确认消息
if err := msg.Ack(false); err != nil {
log.Printf("确认消息失败: %v", err)
}
}
}
}
// Close 关闭连接
func (c *Consumer) Close() {
if c.channel != nil {
c.channel.Close()
}
if c.conn != nil {
c.conn.Close()
}
}
复习:Exchange Types
- Fanout: 忽略 routingKey / bindingKey,将消息广播给所有绑定队列
- Header: 忽略 routingKey / bindingKey,根据消息和队列绑定时指定的 headers 匹配
- 支持精准匹配、数字匹配
- 匹配规则:x-match = all(默认)/ any
- 据说性能很差,没有使用过,官方文档都没有提供代码示例
- Direct: routingKey = bindingKey
- Topic: routingKey 模糊匹配 bindingKey(
*表示一个单词,#表示若干个单词)
默认 Exchange
- 名称:空字符串
- 类型:Direct
- 特性:所有已声明的队列都会隐式绑定(implicit route)到默认 Exchange,bindingKey queue.Name
所以 routingKey quene.Name 就可以将消息发送给对应的队列