TOC

Golang RabbitMQ

RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。

说明:采用 github.com/streadway/amqp 库。

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

生产者

生产者基本流程

生产者:连接

  1. amqp.Dial -> amqp.Connection
  2. amqp.Connection.Channel -> amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
    log.Fatalf("Failed to open a channel: %s", err)
}

生产者:配置(可选)

事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。

  1. amqp.Channel.QueueDeclare
  2. amqp.Channel.ExchangeDeclare
  3. amqp.Channel.QueueBind
q, err := ch.QueueDeclare(
    "hello", // 队列名称
    true,    // 持久化
    false,   // 自动删除
    false,   // 独占
    false,   // 等待服务器回应
    nil,     // 额外参数
)
if err != nil {
    log.Fatalf("Failed to declare a queue: %s", err)
}

生产者:发送

  1. amqp.Channel.Publish
err = ch.Publish(
    "",     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("hello world"),
    })
if err != nil {
    log.Fatalf("Failed to publish a message: %s", err)
}

生产者:收尾

  1. amqp.Connection.Close
  2. amqp.Channel.Close

消费者

消费者基本流程

和生产者基本一致。只是调用的的是 chan.Consume 而不是 chan.Publish
然后就是配置阶段,消费者只用关心队列在不在。

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/streadway/amqp"
)

// Consumer RabbitMQ 消费者
type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    queue   string
}

// NewConsumer 创建消费者
func NewConsumer(conn *amqp.Connection, ch *amqp.Channel, queueName string) (*Consumer, error) {
    // 声明队列
    _, err = DeclareQueue(ch, queueName)
    if err != nil {
        return nil, err
    }

    return &Consumer{
        conn:    conn,
        channel: ch,
        queue:   queueName,
    }, nil
}

// Consume 消费消息
func (c *Consumer) Consume(ctx context.Context, workerID int) error {
    // 设置 QoS
    err := c.channel.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    if err != nil {
        return err
    }

    // 开始消费
    msgs, err := c.channel.Consume(
        c.queue,        // queue
        "",             // consumer
        false,          // auto-ack
        false,          // exclusive
        false,          // no-local
        false,          // no-wait
        nil,            // args
    )
    if err != nil {
        return err
    }

    log.Printf("消费者 %d 已启动,等待消息...", workerID)

    for {
        select {
        case <-ctx.Done():
            log.Printf("消费者 %d 停止", workerID)
            return nil
        case msg, ok := <-msgs:
            if !ok {
                log.Printf("消费者 %d 消息通道关闭", workerID)
                return nil
            }

            // 处理消息
            var message Message
            if err := json.Unmarshal(msg.Body, &message); err != nil {
                log.Printf("解析消息失败: %v", err)
                msg.Nack(false, false) // 拒绝消息,不重新入队
                continue
            }

            // 模拟处理时间
            log.Printf("消费者 %d 处理消息: %+v", workerID, message)
            time.Sleep(1 * time.Second)

            // 确认消息
            if err := msg.Ack(false); err != nil {
                log.Printf("确认消息失败: %v", err)
            }
        }
    }
}

// Close 关闭连接
func (c *Consumer) Close() {
    if c.channel != nil {
        c.channel.Close()
    }
    if c.conn != nil {
        c.conn.Close()
    }
}

复习:Exchange Types

  • Fanout: 忽略 routingKey / bindingKey,将消息广播给所有绑定队列
  • Header: 忽略 routingKey / bindingKey,根据消息和队列绑定时指定的 headers 匹配
    • 支持精准匹配、数字匹配
    • 匹配规则:x-match = all(默认)/ any
    • 据说性能很差,没有使用过,官方文档都没有提供代码示例
  • Direct: routingKey = bindingKey
  • Topic: routingKey 模糊匹配 bindingKey(* 表示一个单词,# 表示若干个单词)

默认 Exchange

  • 名称:空字符串
  • 类型:Direct
  • 特性:所有已声明的队列都会隐式绑定(implicit route)到默认 Exchange,bindingKey queue.Name
    所以 routingKey
    quene.Name 就可以将消息发送给对应的队列

参考资料与拓展阅读

如果你有魔法,你可以看到一个评论框~