#15 通过 Redis 实现事件广播

2023-04-30

我一两年前设计的一个通过 Redis ZSet 做事件广播的方案,刚用 Python 写了一个示例代码贴出来。

  1. 这是一个 Push / Pull 方式的广播机制。
  2. 推送方将消息推送到一个 zset key 中,score 为毫秒时间戳。
  3. key 名为 xxx:timestamp//10,也就是精确到 10 秒的时间戳。
    也就是说每 10 秒一个 key,通过 TTL(5 分钟)实现历史数据自动清除,也避免 event 太多导致大 key 的问题。
  4. 拉取方用上一次拉取时间和当前时间做 score range,从最近的三个 zset 中读到这个时间段内的事件。
import logging
import threading
import time

import redis

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s %(message)s')

redis_host = '127.0.0.1'
redis_port = 6379
redis_db = 1
redis_password = None
redis_prefix = 'broadcast:'
redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)


def handle_broadcast(data):
    # 这里是处理收到的广播请求数据的函数
    # 你需要根据具体需求来实现这个函数
    logging.info(f'处理广播请求数据:{data} ===== ===== ===== =====')


def event_broadcast(data):
    now = time.time()
    now_ms = int(now * 1000)
    now_10s = int(now) // 10

    key = redis_prefix + str(now_10s)
    score = now_ms

    pipeline = redis_conn.pipeline()
    pipeline.zadd(key, {data: score})
    pipeline.expire(key, 300)
    pipeline.execute()


# function event_broadcast(data) {
#   const now = Date.now();
#   const now_ms = now;
#   const now_10s = Math.floor(now / 10000);
#
#   const key = redis_prefix + now_10s;
#   const score = now_ms;
#
#   const pipeline = redis_conn.pipeline();
#   pipeline.zadd(key, score, data);
#   pipeline.expire(key, 300);
#   pipeline.exec();
# }

last_score = 0


def event_fetch():
    global last_score

    now = time.time()
    now_ms = int(now * 1000)
    now_10s = int(now) // 10

    keys = [
        redis_prefix + str(now_10s - 2),
        redis_prefix + str(now_10s - 1),
        redis_prefix + str(now_10s),
    ]

    pipeline = redis_conn.pipeline()
    for key in keys:
        logging.info('%s %20s %20s', key, last_score, now_ms)
        pipeline.zrangebyscore(key, last_score, now_ms, withscores=True)

    results = pipeline.execute()
    for data_list in results:
        for data, _ in data_list:
            handle_broadcast(data.decode('utf-8'))

    last_score = now_ms


def broadcast_loop():
    i = 0
    while True:
        i += 1
        data = f'广播请求数据 {i}'
        event_broadcast(data)
        logging.info(f'广播请求:{data}')
        time.sleep(0.5)


def main():
    broadcast_thread = threading.Thread(target=broadcast_loop, daemon=True)
    broadcast_thread.start()
    while True:
        event_fetch()
        time.sleep(5)


main()

#13 RedisJSON 体验

2022-01-09

相关阅读:Redis 拓展模块,其中用 RedisJSON 举例编译安装。

简单示例 (GET / SET):

127.0.0.1:6379> json.set abc . 123
OK
127.0.0.1:6379> json.get abc .
"123"
127.0.0.1:6379> json.set xyz . '{"a": 1, "b": true, "c": [{"name": "foo"}, {"name": "bar"}]}'
OK
json.get xyz .c[0]
"{\"name\":\"foo\"}"
127.0.0.1:6379> json.set xyz .a 2
OK
127.0.0.1:6379> json.get xyz .c[0].name
"\"foo\""
127.0.0.1:6379> json.set xyz .c[0].name '"loong"'
OK
127.0.0.1:6379> json.get xyz .a .c[0].name
"{\".c[0].name\":\"loong\",\".a\":2}"

Json 路径语法

现在还没有相关行业标准,RedisJSON 自己实现了一套(v1), 后来参考 JSONPath 语法又重新实现了一遍 (v2)。
至于 v1 和 v2 有什么区分,就不用深究了。

大致知道这样可以就行了:

.store.book[0].title
// 相当于 `V['store']['book'][0]['title']`

拓展命令

  1. JSON.ARRAPPEND arrappend
  2. JSON.ARRINDEX arrindex
  3. JSON.ARRINSERT arrinsert
  4. JSON.ARRLEN arrlen
  5. JSON.ARRPOP arrpop
  6. JSON.ARRTRIM arrtrim
  7. JSON.CLEAR clear
  8. JSON.DEBUG debug
  9. JSON.DEL delete
  10. JSON.GET get / jsonget
  11. JSON.MGET mget / jsonmget
  12. JSON.NUMINCRBY numincrby
  13. JSON.NUMMULTBY nummultby img
  14. JSON.OBJKEYS objkeys
  15. JSON.OBJLEN objlen
  16. JSON.RESP resp
  17. JSON.SET set / jsonset
  18. JSON.STRAPPEND strappend
  19. JSON.STRLEN strlen
  20. JSON.TOGGLE toggle
  21. JSON.TYPE type

Python 库还有以下两个方法:

  1. set_file(name, path, file_name, nx=False, xx=False, decode_keys=False)
    return self.set(name, path, file_content, nx=nx, xx=xx, decode_keys=decode_keys)
    
    1. set_path(json_path, root_folder, nx=False, xx=False, decode_keys=False)
    遍历 root_folder 得到 file_path, file_name (file_path.split('.', 1)[0])
    然后调用 set_file(file_name, json_path, file_path, nx=nx, xx=xx, decode_keys=decode_keys)

使用(Python)

Python 最知名的 Redis 客户端库 redis-py 已经支持 RedisJSON 拓展指令。

import redis
from redis.commands.json import JSON

r = redis.Redis(host='localhost', port=6379, db=0)
jr = JSON(r)

jr.set('foo', '.', {"a": 1, "b": True, "c": [{"name": "foo"}, {"name": "bar"}]})

jr.numincrby('foo', 'a', 1)

jr.toggle('foo', '.b') # only works for booleans

jr.arrappend('foo', '.c', {"name": "air"})

print(jr.get('foo', '.'))
# {'a': 2, 'b': False, 'c': [{'name': 'foo'}, {'name': 'bar'}, {'name': 'air'}]}

print(jr.get('foo', '.c[-1].name'))
# air

就缺搜索了,如果加上 RediSearch,就完美了。
PS: RediSearch 的新版本已经优化了对 JSON 的支持。

参考资料与拓展阅读

#12 Redis 拓展模块

2022-01-09

Redis 官方其实还提供了以下拓展:

PS: 还有更多第三方拓展,参考 https://redis.io/modules

关于授权协议

Redis 是 BSD 协议,Redis 拓展模块是 AGPL 协议。
2018 年 8 月,为了防止云服务厂商的利益侵害,Redis 拓展模块授权协议切换到 Apache v2.0 modified with Commons Clause (官方声明)。其中 Common Clause 就是限制提供商业的 Reids 服务。
2019 年 2 月,Redis 拓展模块授权协议再次切换,改成 Redis Source Available License (RSAL),(官方声明)。因为认为当前协议不够明确,容易让人产生误解,而且对 Redis 公司的权益保障不到位。

这个协议对普通用户(个人或企业)没有限制,只是限制了直接利用 Redis 服务赚钱的云厂商。我觉得这是开发者的合理诉求,没有什么值得质疑的。

模块的安装

img

如果只是要体验一下,不想编译,可以去 https://redis.com/redis-enterprise-software/download-center/modules/ 下载试用。

  1. 下载模块源代码,然后编译,生成动态链接库 xxx.so
  2. 修改 Redis 配置文件 loadmodule xxx.so,然后重启 Redis 服务
    PS: 也可以在启动时使用 --loadmodule 参数动态加载模块。

比如 ReJSON:

wget https://github.com/RedisJSON/RedisJSON/archive/refs/tags/v2.0.6.zip -O redisjson-v2.0.6.zip
unzip redisjson-v2.0.6.zip
cd RedisJSON-2.0.6/

# 安装需要用到官网没有提到的 libclang.so
# From: bindgen-0.59.2/src/lib.rs
sudo apt install libclang-dev

cargo build --release
cargo test --features test
# 需要先安装 Rust (请参考其他资料)
# 我了解到的其他模块大多数是 C 写的,RedisJSON 比较特殊,用 Rust 写的

redis-server --loadmodule target/release/librejson.so

相关文章:RedisJSON 体验

#11 转载:Redis LRU 策略

2021-10-26

这篇文章讨论的是 Reids 数据到期之后发生了什么。
Redis 并不会立即删除过期数据,而是根据配置的策略来处理。

config get maxmemory-policy
# maxmemory-policy
# volatile-ttl

原文

When Redis is used as a cache, often it is handy to let it automatically evict old data as you add new data. This behavior is very well known in the community of developers, since it is the default behavior of the popular memcached system.
Redis 和 Memcached 一样会自动清除旧数据。

LRU is actually only one of the supported eviction methods. This page covers the more general topic of the Redis maxmemory directive that is used in order to limit the memory usage to a fixed amount, and it also covers in depth the LRU algorithm used by Redis, that is actually an approximation of the exact LRU.

Starting with Redis version 4.0, a new LFU (Least Frequently Used) eviction policy was introduced. This is covered in a separated section of this documentation.

Maxmemory configuration directive 最大内存配置指令

The maxmemory configuration directive is used in order to configure Redis to use a specified amount of memory for the data set. It is possible to set the configuration directive using the redis.conf file, or later using the CONFIG SET command at runtime.

For example in order to configure a memory limit of 100 megabytes, the following directive can be used inside the redis.conf file.

maxmemory 100mb

Setting maxmemory to zero results into no memory limits. This is the default behavior for 64 bit systems, while 32 bit systems use an implicit memory limit of 3GB.
Markjour 注释:设置成 0 的话,64 位系统上表示没有限制,32 位系统则隐式地使用 3GB 内存限制。

When the specified amount of memory is reached, it is possible to select among different behaviors, called policies. Redis can just return errors for commands that could result in more memory being used, or it can evict some old data in order to return back to the specified limit every time new data is added.

Eviction policies 清理策略

The exact behavior Redis follows when the maxmemory limit is reached is configured using the maxmemory-policy configuration directive.

The following policies are available:

  • noeviction: return errors when the memory limit was reached and the client is trying to execute commands that could result in more memory to be used (most write commands, but DEL and a few more exceptions).
    Markjour 注释:内存占用超过限制之后,执行写命令报错,除了 DEL 和其他少数命令。
  • allkeys-lru: evict keys by trying to remove the less recently used (LRU) keys first, in order to make space for the new data added.
    Markjour 注释:常规的 LRU 策略。
  • volatile-lru: evict keys by trying to remove the less recently used (LRU) keys first, but only among keys that have an expire set, in order to make space for the new data added.
    Markjour 注释:只对设置了 TTL 的 key 进行 LRU 策略。
  • allkeys-random: evict keys randomly in order to make space for the new data added.
    Markjour 注释:随机删除。
  • volatile-random: evict keys randomly in order to make space for the new data added, but only evict keys with an expire set.
    Markjour 注释:只对设置了 TTL 的 key 进行随机删除。
  • volatile-ttl: evict keys with an expire set, and try to evict keys with a shorter time to live (TTL) first, in order to make space for the new data added.
    Markjour 注释:按照 TTL 升序依次删除。

The policies volatile-lru, volatile-random and volatile-ttl behave like noeviction if there are no keys to evict matching the prerequisites.

Picking the right eviction policy is important depending on the access pattern of your application, however you can reconfigure the policy at runtime while the application is running, and monitor the number of cache misses and hits using the Redis INFO output in order to tune your setup.

In general as a rule of thumb:

  • Use the allkeys-lru policy when you expect a power-law distribution in the popularity of your requests, that is, you expect that a subset of elements will be accessed far more often than the rest. This is a good pick if you are unsure.
  • Use the allkeys-random if you have a cyclic access where all the keys are scanned continuously, or when you expect the distribution to be uniform (all elements likely accessed with the same probability).
  • Use the volatile-ttl if you want to be able to provide hints to Redis about what are good candidate for expiration by using different TTL values when you create your cache objects.

The volatile-lru and volatile-random policies are mainly useful when you want to use a single instance for both caching and to have a set of persistent keys. However it is usually a better idea to run two Redis instances to solve such a problem.

It is also worth noting that setting an expire to a key costs memory, so using a policy like allkeys-lru is more memory efficient since there is no need to set an expire for the key to be evicted under memory pressure.
Markjour 注释:TTL 的处理需要占用额外的内存,如果是用 allkeys-lru 策略则可以不需要设置 TTL,可以节约内存。

How the eviction process works 清理进程如何工作

It is important to understand that the eviction process works like this:

  • A client runs a new command, resulting in more data added.
  • Redis checks the memory usage, and if it is greater than the maxmemory limit , it evicts keys according to the policy.
  • A new command is executed, and so forth.

So we continuously cross the boundaries of the memory limit, by going over it, and then by evicting keys to return back under the limits.

If a command results in a lot of memory being used (like a big set intersection stored into a new key) for some time the memory limit can be surpassed by a noticeable amount.

Markjour 注释: 常用的 LRU 过期删除策略:定时,Lazy Expire,LFU,FIFO,RANDOM,TTL。

Approximated LRU algorithm 近似 LRU 算法

Markjour 注释: 出于节约资源(内存)的考虑,Redis LRU 算法并非精确的 LRU 实现。

Redis LRU algorithm is not an exact implementation. This means that Redis is not able to pick the best candidate for eviction, that is, the access that was accessed the most in the past. Instead it will try to run an approximation of the LRU algorithm, by sampling a small number of keys, and evicting the one that is the best (with the oldest access time) among the sampled keys.

However since Redis 3.0 the algorithm was improved to also take a pool of good candidates for eviction. This improved the performance of the algorithm, making it able to approximate more closely the behavior of a real LRU algorithm.

What is important about the Redis LRU algorithm is that you are able to tune the precision of the algorithm by changing the number of samples to check for every eviction. This parameter is controlled by the following configuration directive:

maxmemory-samples 5

The reason why Redis does not use a true LRU implementation is because it costs more memory. However the approximation is virtually equivalent for the application using Redis. The following is a graphical comparison of how the LRU approximation used by Redis compares with true LRU.

LRU comparison

The test to generate the above graphs filled a Redis server with a given number of keys. The keys were accessed from the first to the last, so that the first keys are the best candidates for eviction using an LRU algorithm. Later more 50% of keys are added, in order to force half of the old keys to be evicted.

You can see three kind of dots in the graphs, forming three distinct bands.

  • The light gray band are objects that were evicted.
  • The gray band are objects that were not evicted.
  • The green band are objects that were added.

In a theoretical LRU implementation we expect that, among the old keys, the first half will be expired. The Redis LRU algorithm will instead only probabilistically expire the older keys.

As you can see Redis 3.0 does a better job with 5 samples compared to Redis 2.8, however most objects that are among the latest accessed are still retained by Redis 2.8. Using a sample size of 10 in Redis 3.0 the approximation is very close to the theoretical performance of Redis 3.0.

Note that LRU is just a model to predict how likely a given key will be accessed in the future. Moreover, if your data access pattern closely resembles the power law, most of the accesses will be in the set of keys that the LRU approximated algorithm will be able to handle well.

In simulations we found that using a power law access pattern, the difference between true LRU and Redis approximation were minimal or non-existent.

However you can raise the sample size to 10 at the cost of some additional CPU usage in order to closely approximate true LRU, and check if this makes a difference in your cache misses rate.

To experiment in production with different values for the sample size by using the CONFIG SET maxmemory-samples <count> command, is very simple.

Markjour 注释:1. 在 Redis 3.0 中,LRU 算法的精确度提高了。2. 按照默认的 5 个样本就足够了。

The new LFU mode 频率计数模式

Starting with Redis 4.0, a new Least Frequently Used eviction mode is available. This mode may work better (provide a better hits/misses ratio) in certain cases, since using LFU Redis will try to track the frequency of access of items, so that the ones used rarely are evicted while the one used often have an higher chance of remaining in memory.

If you think at LRU, an item that was recently accessed but is actually almost never requested, will not get expired, so the risk is to evict a key that has an higher chance to be requested in the future. LFU does not have this problem, and in general should adapt better to different access patterns.

To configure the LFU mode, the following policies are available:

  • volatile-lfu Evict using approximated LFU among the keys with an expire set.
    Markjour 注释:对配置了 TTL 的键执行 LFU 清理。
  • allkeys-lfu Evict any key using approximated LFU.
    Markjour 注释:对所有键执行 LFU 清理。

LFU is approximated like LRU: it uses a probabilistic counter, called a Morris counter in order to estimate the object access frequency using just a few bits per object, combined with a decay period so that the counter is reduced over time: at some point we no longer want to consider keys as frequently accessed, even if they were in the past, so that the algorithm can adapt to a shift in the access pattern.
Markjour 注释:概率计数器(莫里斯计数器)。
近似计数算法允许使用少量内存计算大量事件。1977年由贝尔实验室的罗伯特·莫里斯(密码学家)发明,它使用概率技术来增加计数器。

Those informations are sampled similarly to what happens for LRU (as explained in the previous section of this documentation) in order to select a candidate for eviction.

However unlike LRU, LFU has certain tunable parameters: for instance, how fast should a frequent item lower in rank if it gets no longer accessed? It is also possible to tune the Morris counters range in order to better adapt the algorithm to specific use cases.

By default Redis 4.0 is configured to:

  • Saturate the counter at, around, one million requests. Markjour 注释:饱和计数器,一百万请求。
  • Decay the counter every one minute. Markjour 注释:每分钟衰减一次。

Markjour 注释: LFU 有可调节参数,上面是 Redis 4.0 的两个默认配置。

Those should be reasonable values and were tested experimental, but the user may want to play with these configuration settings in order to pick optimal values.

Instructions about how to tune these parameters can be found inside the example redis.conf file in the source distribution, but briefly, they are:

lfu-log-factor 10
lfu-decay-time 1

The decay time is the obvious one, it is the amount of minutes a counter should be decayed, when sampled and found to be older than that value. A special value of 0 means: always decay the counter every time is scanned, and is rarely useful.

The counter logarithm factor changes how many hits are needed in order to saturate the frequency counter, which is just in the range 0-255. The higher the factor, the more accesses are needed in order to reach the maximum. The lower the factor, the better is the resolution of the counter for low accesses, according to the following table:

factor 100 hits 1000 hits 100K hits 1M hits 10M hits
0 104 255 255 255 255
1 18 49 255 255 255
10 10 18 142 255 255
100 8 11 49 143 255

So basically the factor is a trade off between better distinguishing items with low accesses VS distinguishing items with high accesses. More informations are available in the example redis.conf file self documenting comments.

Since LFU is a new feature, we'll appreciate any feedback about how it performs in your use case compared to LRU.

总结

TTL (过期时间) 相关命令:

SET key value [EX seconds|PX milliseconds|EXAT timestamp|PXAT milliseconds-timestamp|KEEPTTL] [NX|XX] [GET]
SETEX key seconds value
PSETEX key milliseconds value
GETEX key [EX seconds|PX milliseconds|EXAT timestamp|PXAT milliseconds-timestamp|PERSIST]

TTL key
PTTL key
EXPIRETIME key  # 获取过期时间戳,单位为秒
PEXPIRETIME key # 获取过期时间戳,单位为毫秒

EXPIRE key seconds
PEXPIRE key milliseconds
EXPIREAT key timestamp
PEXPIREAT key milliseconds-timestamp

相对于 LRU (Least Recently Used), Redis 采用 LFU (Least Frequently Used) 算法对于其应用场景来说确实是个不错的选择。

一共八种策略:

  • noeviction 拒绝新值写入,不自动删除
  • volatile-ttl 按 TTL 值删除
  • volatile-lru / allkeys-lru LRU
  • volatile-lfu / allkeys-lfu LFU
  • volatile-random / allkeys-random 随机删除
evict [ɪˈvɪkt] vt. 驱逐;逐出
eviction [ɪˈvɪkʃn] n. 逐出;赶出;收回
volatile [ˈvɒlətaɪl]
    adj. [化学] 挥发性的;不稳定的;爆炸性的;反复无常的
    n. 挥发物;有翅的动物
    n. (Volatile)人名;(意)沃拉蒂莱

#10 Redis 版本历史

2021-10-20
版本 日期
6.2.8 October 2021
6.2.4 August 2021
6.0.20 April 2021
6.0.12 January 2021
6.0.8 September 2020
6.0 May 2020
5.6.0 April 2020
5.5 Preview April 2019
5.4.14 February 2020
5.4.10 December 2019
5.4.6 July 2019
5.4.4 June 2019
5.4.2 April 2019
5.4 December 2018
5.2.2 August 2018
5.3 beta July 2018
5.2 June 2018
5.0.2 2018 March
5.0 November 2017
4.5 May 2017
4.4 December 2016
4.3.0-230 August 2, 2016
4.2.1-30 October 18, 2015
4.0.0-49 June 18, 2015
0.99.5-24 February 15, 2015
0.99.5-11 January 5, 2015

Redis 4

  1. 模块系统,为后来的 RedisJSON,RedisSearch 打基础
  2. PSYNC 2.0(部分复制)
  3. LFU 优化
  4. 异步优化
  5. DEL -> UNLINK
  6. FLUSHDB, FLUSHALL 增加 ASYNC 参数
  7. RDB-AOF 混合持久化
  8. SWAPDB 命令
  9. MEMORY 命令

Redis 5

Redis 6

  • 多线程 IO
  • SSL
  • ACL 权限控制
  • RESP3
  • 客户端缓存

Redis 7

参见:2022/04/29,Redis 7 的变化

参考资料与拓展阅读

#8 redis 批处理

2021-07-15

先将命令写在 commands.txt 中,一行一个,不用分号结尾,然后:

echo commands.txt | redis-cli -h 127.0.0.1 -p 6379

#7 Golang Redis

2021-06-05
package main

import (
    "log"
    "time"

    "github.com/go-redis/redis"
)

func main() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    var err error

    // 使用 Get/Set/Del =====================================

    err = client.Set("key", "value", 0).Err()
    if err != nil {
        panic(err)
    }

    val, err := client.Get("key").Result()
    if err != nil {
        panic(err)
    }
    log.Println("key", val)

    err = client.Del("key").Err()
    if err != nil {
        panic(err)
    }

    // 使用 Pipeline ========================================

    pipeline := client.Pipeline()

    pipeline.Set("key1", "value1", time.Minute*5)
    pipeline.Set("key2", "value2", time.Minute*5)
    pipeline.Set("key3", "value3", time.Minute*5)
    _, err = pipeline.Exec()
    if err != nil {
        panic(err)
    }

    pipeline.Get("key1")
    pipeline.Get("key2")
    pipeline.Get("key3")
    vals, err := pipeline.Exec()
    if err != nil {
        panic(err)
    }
    val1, _ := vals[0].(*redis.StringCmd).Result()
    val2, _ := vals[1].(*redis.StringCmd).Result()
    val3, _ := vals[2].(*redis.StringCmd).Result()
    log.Println("key1", val1)
    log.Println("key2", val2)
    log.Println("key3", val3)

    pipeline.Del("key1", "key2", "key3")
    _, err = pipeline.Exec()
    if err != nil {
        panic(err)
    }
}
package main

import (
    "log"

    "github.com/go-redis/redis"
)

func main() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })
    log.Println(client)

    var err error
    var items []string

    // List ===============================================
    {
        err = client.RPush("mylist", "item1", "item2", "item3").Err()
        if err != nil {
            panic(err)
        }
        listLen, err := client.LLen("mylist").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of mylist:", listLen)
        items, err = client.LRange("mylist", 0, -1).Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in mylist:", items)
    }

    // Hash ===============================================

    {
        err = client.HSet("myhash", "field1", "value1").Err()
        if err != nil {
            panic(err)
        }
        value1, err := client.HGet("myhash", "field1").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Value of field1:", value1)
        allFields, err := client.HGetAll("myhash").Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== All fields in myhash:", allFields)
    }
    // Set ================================================

    {
        err = client.SAdd("myset", "item1", "item2", "item3").Err()
        if err != nil {
            panic(err)
        }
        setLen, err := client.SCard("myset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of myset:", setLen)
        items, err = client.SMembers("myset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in myset:", items)
    }
    // ZSet ===============================================
    {
        err = client.ZAdd("myzset", redis.Z{Score: 1.0, Member: "one"}, redis.Z{Score: 2.0, Member: "two"}).Err()
        if err != nil {
            panic(err)
        }
        setLen, err := client.ZCard("myzset").Result()
        if err != nil {
            panic(err)
        }
        log.Println("Length of myzset:", setLen)
        items, err = client.ZRange("myzset", 0, -1).Result()
        if err != nil {
            panic(err)
        }
        log.Println("===== Items in myzset:", items)
    }
}

哨兵

这样理论上来说,肯定会有性能损耗,毕竟增加了和哨兵的通信。
具体能差多少,还得实验。

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    failoverClient := redis.NewFailoverClient(&redis.FailoverOptions{
        SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379", "sentinel3:26379"},
        MasterName:    "mymaster",
    })

    pong, err := failoverClient.Ping().Result()
    if err != nil {
        panic(err)
    }
    fmt.Println(pong)
}

集群

package main

import (
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{"redis1:6379", "redis2:6379", "redis3:6379"},
    })
    pong, err := clusterClient.Ping().Result()
    if err != nil {
        panic(err)
    }
    fmt.Println(pong)
}

#6 使用 Redis 做消息队列

2021-03-06

Redis 作者还分叉了项目,专门搞了一个实验项目 disque 来实现一个消息队列管理工具。

https://github.com/antirez/disque

这不是这篇文章的关注重点。这篇文章是想像一下使用 Redis 做 MQ 的情况(没在生产环境使用过)。

一、为什么使用 MQ

无非就是减少耦合,削峰填谷。

中间引入一个组件做代理,自然耦合解开了。
另外,一个稳定高效的 MQ 可以快速吸收消息,避免生产者的等待,同时消费者可以根据自己的处理速度来运行。就好像水库的作用,就是削峰填谷,损有余而补不足。
PS: MQ 好像还起到了一点 HA 的作用。
PS: 一种设计完善的消息推送方案,比如一个事件多方消费,如果没有消息队列则需要多次调用。

至于可能导致的问题嘛,当然也有,就是多了一个组件引入了新的复杂性。
上面提到的好处都是建立在 MQ 稳定可靠运行的基础之上,自然需要很多新的工作来维护消息队列中间件。
小规模系统就应该怎么简单怎么来,但如果系统足够复杂,多点工作总是难免的。

1.1 RabbitMQ

Erlang 所写(我听说过的唯一一个 Erlang 项目),AMQP 0-9-1 (高级消息队列协议) 的开源实现(Mozilla 公共许可证)。

  • PS: 摩根大通在 2003 年设计的 AMQP,然后和红帽合作使用 C++ 开发了一个 AMQP 开源实现,就是现在的 Apache Qpid。
  • PS: AMQP v1.0 于 2004 年成为 ISO 标准 (ISO/IEC 19464:2014)。
    据说和 0-9-1 有很大差别。

  • 生产者发布消息给指定 Exchange

  • Exchange 根据路由规则 (RoutingKey) 将消息投递给 Queue
  • 消费者订阅指定 Queue

1.2 Kafka

据说性能非常好,而且是倍经考验了,时不时看到有大型项目将其作为基础设施使用。
虽然最近两年 Kafka 很热门,但我没有在生产环境使用过。

二、Redis 实现

基础功能:

  • 生产消息
  • 存储消息
  • 消费消息

拓展功能:

  • 重复消费(多个消费者)
  • 确认机制
  • 持久化
  • 广播(消息直接写入多个队列)

2.1 List

采用 List 构建一个简单的 FIFO 队列:

  • LPUSH
  • RPOP/BRPOP 如果没有消息了,会返回 nil
  • RPOPLPUSH / BRPOPLPUSH 可以在处理完成之后再从备份队列删除,实现类似 ACK 的功能

PS: RPOPLPUSH: 弹出一个元素,并将该元素压入另一个列表。BRPOPLPUSH: 如果源列表没有元素会阻塞。

redicConn.lpush('queue', 'a')

while True:
    msg = redisConn.rpop('queue')
    if msg is None:
        time.sleep(1)
        continue
    do_something(msg)

2.2 SortedSet

有序集合,不支持重复消息(有时可以当做功能)。

用时间戳(或加一定权重)当分数,据此排序。


2.3 Pub/Sub

实现了简单的 Pub/Sub 功能。
虽然没有 RabbitMQ 那些多 API,也不支持那些复杂的功能配置,但是在简单的场景下用用还是不错的。

  1. 消费者订阅指定主题 subscribe foo
  2. 生产者发布消息 publish foo ooxxooxx

有两个不可忽视的问题:

  1. Pub 上来的消息必须实时发往订阅者,如果订阅者不在线,那么消息就丢失了。
  2. 如果消息积压到一定程度,Redis 会强制断开消费者连接,清空消息队列。

2.4 Streams

Redis 5.0 引入的新数据类型。
这个名字可能是来自流式计算。
所有提交上来的消息按顺序串成一个消息链。

非常值得一提的是,Stream 实现的队列允许重复消费、支持 ACK

  • XADD
  • XDEL
  • XRANGE

三、总结

  1. Redis 不保证数据完整性,所以如果是对数据完整性有要求的话,不能使用这个方案。
  2. Redis 不支持消息确认机制(Ack/Nack/Reject),需要自己在应用层面实现,但如果这么做的话,不如干脆用 MQ 算了。
  3. 除了 Pub/Sub 之外,ListSortedSetStreams 都支持消息持久化,但是应用时需要注意避免消息堆积,否则会可能会对内存造成压力。

如何保证消息的完整性呢?

参考资料与拓展阅读