#56 Golang RabbitMQ

2021-06-18

RabbitMQ 是啥就不说了,怎么安装部署也不说了,就记录一下 RabbitMQ 在 Golang 开发中的应用。

说明:采用 github.com/streadway/amqp 库。

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

生产者

生产者基本流程

生产者:连接

  1. amqp.Dial -> amqp.Connection
  2. amqp.Connection.Channel -> amqp.Channel
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
    log.Fatalf("Failed to open a channel: %s", err)
}

生产者:配置(可选)

事先把 MQ 配好就行,但从稳妥起见,还是在连接时加上比较好。

  1. amqp.Channel.QueueDeclare
  2. amqp.Channel.ExchangeDeclare
  3. amqp.Channel.QueueBind
q, err := ch.QueueDeclare(
    "hello", // 队列名称
    true,    // 持久化
    false,   // 自动删除
    false,   // 独占
    false,   // 等待服务器回应
    nil,     // 额外参数
)
if err != nil {
    log.Fatalf("Failed to declare a queue: %s", err)
}

生产者:发送

  1. amqp.Channel.Publish
err = ch.Publish(
    "",     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("hello world"),
    })
if err != nil {
    log.Fatalf("Failed to publish a message: %s", err)
}

生产者:收尾

  1. amqp.Connection.Close
  2. amqp.Channel.Close

消费者

消费者基本流程

和生产者基本一致。只是调用的的是 chan.Consume 而不是 chan.Publish
然后就是配置阶段,消费者只用关心队列在不在。

参考资料与拓展阅读

#55 Golang Redis

2021-06-05
package main

import (
    "log"
    "time"

    "github.com/go-redis/redis"
)

func main() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    var err error

    // 使用 Get/Set/Del =====================================

    err = client.Set("key", "value", 0).Err()
    if err != nil {
        panic(err)
    }

    val, err := client.Get("key").Result()
    if err != nil {
        panic(err)
    }
    log.Println("key", val)

    err = client.Del("key").Err()
    if err != nil {
        panic(err)
    }

    // 使用 Pipeline ========================================

    pipeline := client.Pipeline()

    pipeline.Set("key1", "value1", time.Minute*5)
    pipeline.Set("key2", "value2", time.Minute*5)
    pipeline.Set("key3", "value3", time.Minute*5)
    _, err = pipeline.Exec()
    if err != nil {
        panic(err)
    }

    pipeline.Get("key1")
    pipeline.Get("key2")
    pipeline.Get("key3")
    vals, err := pipeline.Exec()
    if err != nil {
        panic(err)
    }
    val1, _ := vals[0].(*redis.StringCmd).Result()
    val2, _ := vals[1].(*redis.StringCmd).Result()
    val3, _ := vals[2].(*redis.StringCmd).Result()
    log.Println("key1", val1)
    log.Println("key2", val2)
    log.Println("key3", val3)

    pipeline.Del("key1", "key2", "key3")
    _, err = pipeline.Exec()
    if err != nil {
        panic(err)
    }
}
package main

import (
    "log"

    "github.com/go-redis/redis"
)

func main() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })
    log.Println(client)

    var err error
    var items []string

    // List ===============================================
    {
        err = client.RPush("mylist", "item1", "item2", "item3").Err()
        if err != nil {
            panic(err)
        }
        listLen, err := client.LLen("mylist").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of mylist:", listLen)
        items, err = client.LRange("mylist", 0, -1).Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in mylist:", items)
    }

    // Hash ===============================================

    {
        err = client.HSet("myhash", "field1", "value1").Err()
        if err != nil {
            panic(err)
        }
        value1, err := client.HGet("myhash", "field1").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Value of field1:", value1)
        allFields, err := client.HGetAll("myhash").Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== All fields in myhash:", allFields)
    }
    // Set ================================================

    {
        err = client.SAdd("myset", "item1", "item2", "item3").Err()
        if err != nil {
            panic(err)
        }
        setLen, err := client.SCard("myset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of myset:", setLen)
        items, err = client.SMembers("myset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in myset:", items)
    }
    // ZSet ===============================================
    {
        err = client.ZAdd("myzset", redis.Z{Score: 1.0, Member: "one"}, redis.Z{Score: 2.0, Member: "two"}).Err()
        if err != nil {
            panic(err)
        }
        setLen, err := client.ZCard("myzset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of myzset:", setLen)
        items, err = client.ZRange("myzset", 0, -1).Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in myzset:", items)
    }
}

哨兵

这样理论上来说,肯定会有性能损耗,毕竟增加了和哨兵的通信。
具体能差多少,还得实验。

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    failoverClient := redis.NewFailoverClient(&redis.FailoverOptions{
        SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379", "sentinel3:26379"},
        MasterName:    "mymaster",
    })

    pong, err := failoverClient.Ping().Result()
    if err != nil {
        panic(err)
    }
    fmt.Println(pong)
}

集群

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{"redis1:6379", "redis2:6379", "redis3:6379"},
    })
    pong, err := clusterClient.Ping().Result()
    if err != nil {
        panic(err)
    }
    fmt.Println(pong)
}

#54 Golang MySQL

2021-06-04

测试表:

CREATE TABLE `users` (
    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
    `username` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    `password` VARCHAR(100) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    `nickname` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
    `email` VARCHAR(100) NOT NULL DEFAULT '' COLLATE 'utf8mb4_unicode_ci',
    `birthday` VARCHAR(10) NOT NULL DEFAULT '0000-00-00' COLLATE 'utf8mb4_unicode_ci',
    `age` TINYINT(3) UNSIGNED NOT NULL DEFAULT '0',
    `level` TINYINT(3) NOT NULL DEFAULT '0',
    `disabled` TINYINT(1) NOT NULL DEFAULT '0',
    `created_at` DATETIME NOT NULL DEFAULT current_timestamp(),
    `updated_at` DATETIME NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
    PRIMARY KEY (`id`) USING BTREE,
    UNIQUE INDEX `username` (`username`) USING BTREE
)
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB;

连接

package main

import (
 "database/sql"
 "fmt"
 "time"

 _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB   //全局变量client

func initMySQL() (err error) {
 dsn := "root:123456@tcp(127.0.0.1:3306)/test"
 db, err = sql.Open("mysql", dsn)
 if err != nil {
  panic(err)
 }
 err = db.Ping() //检测是否连接成功
 if err != nil {
  return
 }
 db.SetMaxOpenConns(200)                 //最大连接数
 db.SetMaxIdleConns(10)                  //连接池里最大空闲连接数。必须要比maxOpenConns小
 db.SetConnMaxLifetime(time.Second * 10) //最大存活保持时间
 db.SetConnMaxIdleTime(time.Second * 10) //最大空闲保持时间
 return
}

func main() {
 if err := initMySQL(); err != nil {
  fmt.Printf("connect to db failed,err:%v\n", err)
 } else {
  fmt.Println("connect to db success")
 }

 sqlStr := "SELECT id, name FROM sys_user WHERE id=?"
 var u user
 //非常重要:确保QueryRow之后调用Scan方法,否则持有的数据库链接不会被释放
 err := db.QueryRow(sqlStr, 1).Scan(&u.id, &u.name)
 if err != nil {
  fmt.Printf("scan failed, err: %v\n", err)
  return
 }
 fmt.Printf("id:%d,name:%s,age:%d\n", u.id, u.name)

 defer db.Close()

}

//user结构体
type user struct {
 id int
 name string
}

Insert 增

Update 改

Delete 删

Select 查

#53 Go interface

2021-06-02

不像我们常见的一些语言,实现接口需要显式声明,比如 PHP:

interface Animal {
    public function move();
    public function makeSound();
}

class Dog implements Animal {
    public function move() {
        echo "Dog is moving\n";
    }

    public function makeSound() {
        echo "Woof\n";
    }
}

Go 不需要声明,说到不如做到,实现了所有接口方法就行了。
PS:我还是觉得写出来比较好一些,阅读代码的时候更加一目了然,也没有什么副作用。

示例

package main

import "fmt"

type Animal interface {
    move() string
    makeSound() string
}

type Dog struct {
    name string
}

func (d Dog) move() string {
    return fmt.Sprintf("%s is running", d.name)
}

func (d Dog) makeSound() string {
    return "woof"
}

func main() {
    dog := Dog{name: "Fido"}

    // 判断接口实现关系
    _, ok := interface{}(dog).(Animal)
    if ok {
        fmt.Printf("%T is Animal\n", dog)
    } else {
        fmt.Printf("%T is NOT Animal\n", dog)
    }

    fmt.Println(dog.move())
    fmt.Println(dog.makeSound())
}

interface{}

interface{} says nothing.

Go 谚语第六条是什么意思?

我的理解:

interface{} 可以表示任意类型,有时使用起来会很方便。
但是必须注意这意味着对类型没有任何约束,无论从程序的严谨度还是代码可读性上讲,都是有害的。
如何可以的话,编码中应当尽量采用范围小一些的接口,就比如到处可见的 io.Writer 接口,它有各类实现:

  • bytes.Buffer
  • bufio.Writer
  • os.File
  • compress/gzip.Writer
  • crypto/cipher.StreamWriter
  • encoding/csv.Writer
  • net/http.ResponseWriter
  • archive/zip.Writer
  • image/png.Encoder

#52 Golang JSON

2021-06-02

Golang 有一个 encoding/json 标准库,在多数情况下已经够用了。

注意:因为 Golang 是一种静态类型的语言,所以解析 JSON 字符串必须了解里面的数据结构。
如果是用惯了 JS、Python、PHP 等动态语言的话,到这里会有些郁闷,怎么解析一个 JSON 还这么复杂。

#51 Gron 的设计

2021-05-30

2021/08/23,Golang 定时任务 是讲通过 Golang 原生的 time.Ticker 实现定时任务,本文介绍的是通过第三方库 robfig/cron 实现定时任务。

概述

  • 代码:robfig/cron
  • 文档: https://godoc.org/github.com/robfig/cron

标准 Cron 表达式 分成 5 段:分 minutes、时 hours、日 day of month、月 month、周 day of week。
robfig/cron v2 默认支持 6 个字段(包括秒 seconds),从 v3 开始又回到默认 5 个字段(需要做以下显式声明:cron.New(cron.WithSeconds()))。

type SpecSchedule struct {
    Second, Minute, Hour, Dom, Month, Dow uint64
}

使用

// Seconds field, required
cron.New(cron.WithSeconds())

// Seconds field, optional
cron.New(cron.WithParser(cron.NewParser(
    cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
c := cron.New()
c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("@hourly",      func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") })
c.Start()
..
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
..
c.Stop()  // Stop the scheduler (does not stop any jobs already running).
Field name   | Mandatory? | Allowed values  | Allowed special characters
----------   | ---------- | --------------  | --------------------------
Seconds      | Yes        | 0-59            | * / , -
Minutes      | Yes        | 0-59            | * / , -
Hours        | Yes        | 0-23            | * / , -
Day of month | Yes        | 1-31            | * / , - ?
Month        | Yes        | 1-12 or JAN-DEC | * / , -
Day of week  | Yes        | 0-6 or SUN-SAT  | * / , - ?

Entry                  | Description                                | Equivalent To
-----                  | -----------                                | -------------
@yearly (or @annually) | Run once a year, midnight, Jan. 1st        | 0 0 0 1 1 *
@monthly               | Run once a month, midnight, first of month | 0 0 0 1 * *
@weekly                | Run once a week, midnight between Sat/Sun  | 0 0 0 * * 0
@daily (or @midnight)  | Run once a day, midnight                   | 0 0 0 * * *
@hourly                | Run once an hour, beginning of hour        | 0 0 * * * *

#50 Golang 时间处理

2021-05-21

主要是 time 包(2021/05/21,Golang time)。

  • time.Time 时间,精确到纳秒
  • time.Location 时区
  • time.Duration 时间间隔,精确到纳秒,[-292.47 years, 292.47 years]
 (1 << 63) / 1_000_000_000 / 3600 / 24 / 365
292.471208677536
src/time/time.go:type Time struct {
src/time/time.go-       wall uint64     // 秒   wall time seconds
src/time/time.go-       ext  int64      // 毫秒 wall time nanoseconds
src/time/time.go-       loc *Location   // 时区
src/time/time.go-}

src/time/zoneinfo.go:type Location struct {
src/time/zoneinfo.go-   name string
src/time/zoneinfo.go-   zone []zone
src/time/zoneinfo.go-   tx   []zoneTrans
src/time/zoneinfo.go-   extend string
src/time/zoneinfo.go-   cacheStart int64
src/time/zoneinfo.go-   cacheEnd   int64
src/time/zoneinfo.go-   cacheZone  *zone
src/time/zoneinfo.go-}
src/time/zoneinfo.go:type zone struct {
src/time/zoneinfo.go-   name   string // abbreviated name, "CET"
src/time/zoneinfo.go-   offset int    // seconds east of UTC
src/time/zoneinfo.go-   isDST  bool   // is this zone Daylight Savings Time?
src/time/zoneinfo.go-}
src/time/zoneinfo.go-type zoneTrans struct {
src/time/zoneinfo.go-   when         int64 // transition time, in seconds since 1970 GMT
src/time/zoneinfo.go-   index        uint8 // the index of the zone that goes into effect at that time
src/time/zoneinfo.go-   isstd, isutc bool  // ignored - no idea what these mean
src/time/zoneinfo.go-}

var UTC *Location = &utcLoc
var utcLoc = Location{name: "UTC"}
var Local *Location = &localLoc
var localLoc Location

src/time/time.go:type Duration int64

基本用法

time.Now()              // time.Time

// type Month int
// const (
//  January Month = 1 + iota
//  February
//  March
//  April
//  May
//  June
//  July
//  August
//  September
//  October
//  November
//  December
// )
// var Local *Location = &localLoc
func Date(year int, month Month, day, hour, min, sec, nsec int, loc *Location) Time // 组装 Time

// Time -> 时间戳
time.Now().Unix()       // int64
time.Now().UnixMilli()  // int64 毫秒
time.Now().UnixMicro()  // int64 微秒
time.Now().UnixNano()   // int64 纳秒

func Unix(sec int64, nsec int64) Time   // 时间戳 -> Time

time.Now().Format("2006-01-02 15:04:05")    // Time -> timestr
timeObj, erro := time.Parse("2006-01-02 15:04:05", timestr)  // timestr -> Time
// 特别注意:time.Parse 是按照 UTC 时区解析
timeObj, erro := time.ParseInLocation("2006-01-02 15:04:05", timeStr, time.Local)

间隔计算

func (t Time) Truncate(d Duration) Time
func (t Time) Round(d Duration) Time

func (d Duration) Truncate(m Duration) Duration
func (d Duration) Round(m Duration) Duration

func (t Time) Add(d Duration) Time  // Time 相加
func (t Time) Sub(u Time) Duration  // Time 相减

func Since(t Time) Duration         // Now - t
func Until(t Time) Duration         // t - Now

time.Now().Truncate(时间)
package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("Hello, 世界")
    now := time.Date(2021, 5, 21, 15, 46, 47, 0, time.Local)
    fmt.Println(now)
    // d := time.Hour
    d := time.Duration(3000_000_000_000)
    fmt.Println(now.Truncate(d))
    fmt.Println(now.Round(d))
}
// 2021-05-21 15:46:47 +0000 UTC
// 2021-05-21 15:00:00 +0000 UTC
// 2021-05-21 15:50:00 +0000 UTC

时区