背景 在后端服务中,经常有这样一种场景,写数据库操作在异步队列中执行,且这个异步队列是多进程运行的,这时如果对同一资源进行写库操作,很有可能产生数据被覆盖等问题,于是就需要业务层在更新数据库之前进行加锁,这样保证在更改同一资源时,没有其他更新操作干涉,保证数据一致性。
但如果在更新前对数据库更新加锁,那此时又来了新的更新数据库的请求,但这个更新操作不能丢弃掉,需要延迟执行,那这就需要添加到延迟队列中,延迟执行。
那么如何实现一个延迟队列?利用Redis
的SortedSet
和String
这两种结构,就可以轻松实现。
具体实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 """Delay Queue""" import jsonimport timeimport uuidimport redisclass DelayQueue (object) : """延迟队列""" QUEUE_KEY = 'delay_queue' DATA_PREFIX = 'queue_data' def __init__ (self, conf) : host, port, db = conf['host' ], conf['port' ], conf['db' ] self.client = redis.Redis(host=host, port=port, db=db) def push (self, data) : """push :param data: data """ task_id = str(uuid.uuid4()) data_key = '{}_{}' .format(self.DATA_PREFIX, task_id) self.client.set(data_key, json.dumps(data)) self.client.zadd(self.QUEUE_KEY, data_key, int(time.time())) def pop (self, num=5 , previous=3 ) : """pop多条数据 :param num: pop多少个 :param previous: 获取多少秒前push的数据 """ until_ts = int(time.time()) - previous task_ids = self.client.zrangebyscore( self.QUEUE_KEY, 0 , until_ts, start=0 , num=num) if not task_ids: return [] pipe = self.client.pipeline() for task_id in task_ids: pipe.zrem(self.QUEUE_KEY, task_id) data_keys = [ data_key for data_key, flag in zip(task_ids, pipe.execute()) if flag ] if not data_keys: return [] data = [ json.loads(item) for item in self.client.mget(data_keys) ] self.client.delete(*data_keys) return data
实现思路 push 在push
数据时,执行如下几步:
生成一个唯一key
,这里使用uuid4生成(uuid4是根据随机数生成的,重复概率非常小,具体参考这里 )
把数据序列化后存入这个唯一key
的String
结构中
把这个唯一key
加到SortedSet
中,score
是当前时间戳
这里利用SortedSet
记录添加数据的时间,便于在获取时根据时间获取之前的数据,达到延迟的效果。
而真正的数据则存放在String
结构中,等获取时先拿到数据的key
再获取真正的数据。
这里可能有人会疑问,为什么不把真正的数据放到SortedSet
的name
中?
把数据放入name
中可能会产生瞬间写入相同数据导致数据多条变一条 的情况
把数据序列化放到SortedSet
的name
中有些过大,不太符合使用习惯
pop 此pop
是可以获取多条数据的,上面的代码默认是获取延迟队列中3秒前的5条数据,具体思路如下:
计算previous
秒前的时间戳,使用SortedSet
的zrangebysocre
方法获取previous
秒之前添加的唯一key
如果SortedSet
中有数据,则利用Redis
删除的原子性,使用zrem
依次删除SortedSet
的元素,如果删除成功,则使用,防止多进程并发执行此方法,拿到相同的数据
那到可用的唯一key
,从String
中获取真正的数据即可
这里最重要的是第二步,在拿出SortedSet
的数据后,一定要防止其他进程并发获取到相同的数据,所以在这里使用zrem
依次删除元素,保证只有删除成功的进程才能使用这条数据。
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timefrom delay import DelayQueueredis_conf = {'host' : '127.0.0.1' , 'port' : 6379 , 'db' : 0 } queue = DelayQueue(redis_conf) for i in range(20 ): item = {'user' : 'user-{}' .format(i)} queue.push(item) data = queue.pop(num=10 ) assert len(data) == 0 time.sleep(10 ) data = queue.pop(num=10 ) assert len(data) == 10 data = queue.pop(num=10 , previous=5 ) assert len(data) == 10
使用就比较简单了,在实际使用过程中,每次在处理正常队列时,通过上面的方法获取一下延迟队列的数据,如果延迟队列中有数据,那么按照业务正常处理就可以了,这样就达到了数据延迟处理的效果。