我一两年前设计的一个通过 Redis ZSet 做事件广播的方案,刚用 Python 写了一个示例代码贴出来。
- 这是一个 Push / Pull 方式的广播机制。
- 推送方将消息推送到一个 zset key 中,score 为毫秒时间戳。
- key 名为 xxx:timestamp//10,也就是精确到 10 秒的时间戳。
也就是说每 10 秒一个 key,通过 TTL(5 分钟)实现历史数据自动清除,也避免 event 太多导致大 key 的问题。 - 拉取方用上一次拉取时间和当前时间做 score range,从最近的三个 zset 中读到这个时间段内的事件。
import logging
import threading
import time
import redis
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s %(message)s')
redis_host = '127.0.0.1'
redis_port = 6379
redis_db = 1
redis_password = None
redis_prefix = 'broadcast:'
redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)
def handle_broadcast(data):
# 这里是处理收到的广播请求数据的函数
# 你需要根据具体需求来实现这个函数
logging.info(f'处理广播请求数据:{data} ===== ===== ===== =====')
def event_broadcast(data):
now = time.time()
now_ms = int(now * 1000)
now_10s = int(now) // 10
key = redis_prefix + str(now_10s)
score = now_ms
pipeline = redis_conn.pipeline()
pipeline.zadd(key, {data: score})
pipeline.expire(key, 300)
pipeline.execute()
# function event_broadcast(data) {
# const now = Date.now();
# const now_ms = now;
# const now_10s = Math.floor(now / 10000);
#
# const key = redis_prefix + now_10s;
# const score = now_ms;
#
# const pipeline = redis_conn.pipeline();
# pipeline.zadd(key, score, data);
# pipeline.expire(key, 300);
# pipeline.exec();
# }
last_score = 0
def event_fetch():
global last_score
now = time.time()
now_ms = int(now * 1000)
now_10s = int(now) // 10
keys = [
redis_prefix + str(now_10s - 2),
redis_prefix + str(now_10s - 1),
redis_prefix + str(now_10s),
]
pipeline = redis_conn.pipeline()
for key in keys:
logging.info('%s %20s %20s', key, last_score, now_ms)
pipeline.zrangebyscore(key, last_score, now_ms, withscores=True)
results = pipeline.execute()
for data_list in results:
for data, _ in data_list:
handle_broadcast(data.decode('utf-8'))
last_score = now_ms
def broadcast_loop():
i = 0
while True:
i += 1
data = f'广播请求数据 {i}'
event_broadcast(data)
logging.info(f'广播请求:{data}')
time.sleep(0.5)
def main():
broadcast_thread = threading.Thread(target=broadcast_loop, daemon=True)
broadcast_thread.start()
while True:
event_fetch()
time.sleep(5)
main()