python 實(shí)現(xiàn)消費(fèi)者優(yōu)先級(jí)隊(duì)列
本文分享自天翼云開(kāi)發(fā)者社區(qū)《python 實(shí)現(xiàn)消費(fèi)者優(yōu)先級(jí)隊(duì)列》,作者:Frost
關(guān)鍵字
條件變量,信號(hào)量,消費(fèi)者優(yōu)先級(jí),公平性,堆隊(duì)列算法
需求背景
常見(jiàn)的是消息隊(duì)列支持為消息指定優(yōu)先級(jí),但支持為消費(fèi)者指定優(yōu)先級(jí)的卻很少見(jiàn),作者在網(wǎng)上檢索一般能查到 rabbitMQ 的消費(fèi)者優(yōu)先級(jí)相關(guān)資料。并沒(méi)有找到其它語(yǔ)言的資料。
而 python 標(biāo)準(zhǔn)庫(kù)里所有隊(duì)列都是公平的,并沒(méi)有使用非公平的參數(shù),因此可能不能滿足有些場(chǎng)景的需求。
什么是公平與非公平呢,這個(gè)一般是指互斥鎖的特征,互斥鎖的多個(gè)嘗試取鎖的線程其實(shí)很類似隊(duì)列的多個(gè)消費(fèi)者,以 waiter 統(tǒng)稱。
假設(shè)有 A, B, C, D 四個(gè) waiter,他們按照字母順序依次調(diào)用 acquire()/get(),
那么等到有線程釋放鎖或隊(duì)列放入了一條消息,會(huì)按照先來(lái)后到的順序,喚醒對(duì)應(yīng)的 waiter,也就是這里的 A,同理,按照排隊(duì)順序,B -> C -> D 將是后續(xù)的喚醒順序,其實(shí)簡(jiǎn)單講就是 FIFO。
一般來(lái)說(shuō) FIFO 策略具有普適性,可以避免有的消費(fèi)者被餓死,但某些場(chǎng)景我們希望給隊(duì)列的消費(fèi)者賦予優(yōu)先級(jí),每次優(yōu)先喚醒仍在等待消費(fèi)的優(yōu)先級(jí)最高的消費(fèi)者。
下面會(huì)給出 pure python 的實(shí)現(xiàn)。
實(shí)現(xiàn)原理
先閱讀 python 自帶的 SimpleQueue 源碼 (pure python 版本,位于 Lib\queue.py)。
class _PySimpleQueue: '''Simple, unbounded FIFO queue. This pure Python implementation is not reentrant. ''' # Note: while this pure Python version provides fairness # (by using a threading.Semaphore which is itself fair, being based # on threading.Condition), fairness is not part of the API contract. # This allows the C version to use a different implementation. def __init__(self): self._queue = deque() self._count = threading.Semaphore(0) def put(self, item, block=True, timeout=None): '''Put the item on the queue. The optional 'block' and 'timeout' arguments are ignored, as this method never blocks. They are provided for compatibility with the Queue class. ''' self._queue.append(item) self._count.release() def get(self, block=True, timeout=None): '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). ''' if timeout is not None and timeout < 0: raise ValueError("'timeout' must be a non-negative number") if not self._count.acquire(block, timeout): raise Empty return self._queue.popleft() def put_nowait(self, item): '''Put an item into the queue without blocking. This is exactly equivalent to `put(item, block=False)` and is only provided for compatibility with the Queue class. ''' return self.put(item, block=False) def get_nowait(self): '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. ''' return self.get(block=False) def empty(self): '''Return True if the queue is empty, False otherwise (not reliable!).''' return len(self._queue) == 0 def qsize(self): '''Return the approximate size of the queue (not reliable!).''' return len(self._queue) __class_getitem__ = classmethod(types.GenericAlias)
docstring 里面說(shuō)明,這個(gè)隊(duì)列是保證了公平性,因?yàn)槠涫褂玫男盘?hào)量實(shí)現(xiàn)是公平的。
符合直覺(jué)的是,我們?cè)?get 方法以及信號(hào)量的 acquire 方法增加一個(gè)優(yōu)先級(jí)數(shù)值的參數(shù),那么再來(lái)看信號(hào)量的實(shí)現(xiàn),看看能不能做到這一點(diǎn),
class Semaphore: """This class implements semaphore objects. Semaphores manage a counter representing the number of release() calls minus the number of acquire() calls, plus an initial value. The acquire() method blocks if necessary until it can return without making the counter negative. If not given, value defaults to 1. """ # After Tim Peters' semaphore class, but not quite the same (no maximum) def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") self._cond = Condition(Lock()) self._value = value def acquire(self, blocking=True, timeout=None): """Acquire a semaphore, decrementing the internal counter by one. When invoked without arguments: if the internal counter is larger than zero on entry, decrement it by one and return immediately. If it is zero on entry, block, waiting until some other thread has called release() to make it larger than zero. This is done with proper interlocking so that if multiple acquire() calls are blocked, release() will wake exactly one of them up. The implementation may pick one at random, so the order in which blocked threads are awakened should not be relied on. There is no return value in this case. When invoked with blocking set to true, do the same thing as when called without arguments, and return true. When invoked with blocking set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. When invoked with a timeout other than None, it will block for at most timeout seconds. If acquire does not complete successfully in that interval, return false. Return true otherwise. """ if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value == 0: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._value -= 1 rc = True return rc __enter__ = acquire def release(self, n=1): """Release a semaphore, incrementing the internal counter by one or more. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. """ if n < 1: raise ValueError('n must be one or more') with self._cond: self._value += n for i in range(n): self._cond.notify() def __exit__(self, t, v, tb): self.release()
可以看到信號(hào)量其實(shí)使用條件變量實(shí)現(xiàn)的,分析可知只能在條件變量的 wait 方法增加優(yōu)先級(jí)數(shù)值。下面只展示不完整的,只有關(guān)鍵部分的源碼
class Condition: """Class that implements a condition variable. A condition variable allows one or more threads to wait until they are notified by another thread. If the lock argument is given and not None, it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock. """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque() def _at_fork_reinit(self):... def __enter__(self):... def __exit__(self, *args):... def __repr__(self):... def _release_save(self):... def _acquire_restore(self, x):... def _is_owned(self):... def wait(self, timeout=None): """Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired. """ if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self._waiters.append(waiter) saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(waiter) except ValueError: pass def wait_for(self, predicate, timeout=None):... def notify(self, n=1): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting. """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") waiters = self._waiters while waiters and n > 0: waiter = waiters[0] try: waiter.release() except RuntimeError: # gh-92530: The previous call of notify() released the lock, # but was interrupted before removing it from the queue. # It can happen if a signal handler raises an exception, # like CTRL+C which raises KeyboardInterrupt. pass else: n -= 1 try: waiters.remove(waiter) except ValueError: pass def notify_all(self):... def notifyAll(self):...
容易看出,關(guān)鍵部分在于 notify 方法,這里決定了通知哪一個(gè) waiter,每個(gè) waiter 對(duì)應(yīng)隊(duì)列的一個(gè)消費(fèi)者。代碼第 106 行的 waiters[0] 完美說(shuō)明了 FIFO 原則,那么我們只需要改動(dòng)這個(gè) self._waiters 這個(gè)雙向隊(duì)列內(nèi) waiter 插入/獲取的策略就行了。
可以使用標(biāo)準(zhǔn)庫(kù)內(nèi) heapq 這個(gè)包來(lái)實(shí)現(xiàn),也就是使用堆隊(duì)列算法。另外 wait 方法要增加一個(gè) priority 參數(shù)。具體實(shí)現(xiàn)如下:
import heapqimport itertoolsfrom dataclasses import dataclass, fieldfrom typing import Any, Tuple@dataclass(order=True)class PrioritizedWaiter: priority: Tuple waiter: Any=field(compare=False)class Condition: """Class that implements a condition variable. A condition variable allows one or more threads to wait until they are notified by another thread. If the lock argument is given and not None, it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock. """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # Export the lock's acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = [] self._counter = itertools.count() def _at_fork_reinit(self):... def __enter__(self):... def __exit__(self, *args):... def __repr__(self):... def _release_save(self):... def _acquire_restore(self, x):... def _is_owned(self):... def wait(self, timeout=None, priority=10): """Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired. """ if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() item = PrioritizedWaiter((priority, next(self._counter), waiter) heapq.heappush(self._waiters, item) saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(item) except ValueError: pass else: heapq.heapify(self._waiters) def wait_for(self, predicate, timeout=None):... def notify(self, n=1): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting. """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") waiters = self._waiters while waiters and n > 0: waiter = waiters[0].waiter try: waiter.release() except RuntimeError: # gh-92530: The previous call of notify() released the lock, # but was interrupted before removing it from the queue. # It can happen if a signal handler raises an exception, # like CTRL+C which raises KeyboardInterrupt. pass else: n -= 1 try: heapq.heappop(waiters) except ValueError: pass def notify_all(self):... def notifyAll(self):...
由此,最關(guān)鍵的實(shí)現(xiàn)完成了,接下來(lái)只需要給 _PySimpleQueue.get 方法也增加 priority 參數(shù),并傳入 Semaphore.acquire 方法。 Semaphore.acquire 方法增加 priority 參數(shù),并傳入給 Condition.wait 方法,就完成啦,限于篇幅這里就不全寫(xiě)下來(lái)了。
另外這里雖然加入了 priority 參數(shù),但完全不使用這個(gè)參數(shù)時(shí),其行為和原始版本時(shí)沒(méi)有區(qū)別的,即依然符合 FIFO 策略。
*博客內(nèi)容為網(wǎng)友個(gè)人發(fā)布,僅代表博主個(gè)人觀點(diǎn),如有侵權(quán)請(qǐng)聯(lián)系工作人員刪除。