TOC

通过 Redis 实现事件广播

我一两年前设计的一个通过 Redis ZSet 做事件广播的方案,刚用 Python 写了一个示例代码贴出来。

  1. 这是一个 Push / Pull 方式的广播机制。
  2. 推送方将消息推送到一个 zset key 中,score 为毫秒时间戳。
  3. key 名为 xxx:timestamp//10,也就是精确到 10 秒的时间戳。
    也就是说每 10 秒一个 key,通过 TTL(5 分钟)实现历史数据自动清除,也避免 event 太多导致大 key 的问题。
  4. 拉取方用上一次拉取时间和当前时间做 score range,从最近的三个 zset 中读到这个时间段内的事件。
import logging
import threading
import time

import redis

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s %(message)s')

redis_host = '127.0.0.1'
redis_port = 6379
redis_db = 1
redis_password = None
redis_prefix = 'broadcast:'
redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)


def handle_broadcast(data):
    # 这里是处理收到的广播请求数据的函数
    # 你需要根据具体需求来实现这个函数
    logging.info(f'处理广播请求数据:{data} ===== ===== ===== =====')


def event_broadcast(data):
    now = time.time()
    now_ms = int(now * 1000)
    now_10s = int(now) // 10

    key = redis_prefix + str(now_10s)
    score = now_ms

    pipeline = redis_conn.pipeline()
    pipeline.zadd(key, {data: score})
    pipeline.expire(key, 300)
    pipeline.execute()


# function event_broadcast(data) {
#   const now = Date.now();
#   const now_ms = now;
#   const now_10s = Math.floor(now / 10000);
#
#   const key = redis_prefix + now_10s;
#   const score = now_ms;
#
#   const pipeline = redis_conn.pipeline();
#   pipeline.zadd(key, score, data);
#   pipeline.expire(key, 300);
#   pipeline.exec();
# }

last_score = 0


def event_fetch():
    global last_score

    now = time.time()
    now_ms = int(now * 1000)
    now_10s = int(now) // 10

    keys = [
        redis_prefix + str(now_10s - 2),
        redis_prefix + str(now_10s - 1),
        redis_prefix + str(now_10s),
    ]

    pipeline = redis_conn.pipeline()
    for key in keys:
        logging.info('%s %20s %20s', key, last_score, now_ms)
        pipeline.zrangebyscore(key, last_score, now_ms, withscores=True)

    results = pipeline.execute()
    for data_list in results:
        for data, _ in data_list:
            handle_broadcast(data.decode('utf-8'))

    last_score = now_ms


def broadcast_loop():
    i = 0
    while True:
        i += 1
        data = f'广播请求数据 {i}'
        event_broadcast(data)
        logging.info(f'广播请求:{data}')
        time.sleep(0.5)


def main():
    broadcast_thread = threading.Thread(target=broadcast_loop, daemon=True)
    broadcast_thread.start()
    while True:
        event_fetch()
        time.sleep(5)


main()