Condition是一种多线程通信工具,表示多线程下参与数据竞争的线程的一种状态,主要负责多线程环境下对线程的挂起和唤醒工作
实例化
实例Condition时可以指定一个lock,如果没有指定,默认创建RLock的实例。同时Condition拥有与RLock一样的上锁方法acquire()和解锁方法release()。
并初始化双端队列waiters,用于存放 wait的thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class Condition: def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque() `
|
实现上下文语法
1 2 3 4 5
| def __enter__(self): return self._lock.__enter__()
def __exit__(self, *args): return self._lock.__exit__(*args)
|
初探notify和wait
条件锁的两个重要方法是notify()和wait()。notify()和wait()必须在条件锁上锁的状态下使用
notify()和wait()被调用,程序会先去调用self._is_owned(),判断当前线程号与条件锁中的self._ower是否一致,如果不一致,抛出异常RuntimeError。
1 2 3 4 5 6 7
| def _is_owned(self): if self._lock.acquire(0): self._lock.release() return False else: return True
|
wait()先回释放第一层Rlock,其他线程就可以获取锁执行,然后会生成一个Lock,写入waiter并将自己锁住,等待其他线程释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| def wait(self, timeout=None):
if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self._waiters.append(waiter) saved_state = self._release_save() gotit = False try: if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(waiter) except ValueError: pass
|
notify()是在一个双端队列中进行操作,这个队列在Condition中名为_waiters。默认情况下,notify只会释放一个锁(按先进先出原则)。
如果队列中没有锁,直接退出函数,不报任何异常。
1 2 3 4 5 6 7 8 9 10 11 12 13
| def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n)) if not waiters_to_notify: return for waiter in waiters_to_notify: waiter.release() try: all_waiters.remove(waiter) except ValueError: pass
|
Condition中还有一个notify_all()方法,调用它会释放队列中全部的锁。
1 2 3 4
| def notify_all(self): self.notify(len(self._waiters))
notifyAll = notify_all
|
总结
condition 非常重要,在Queue,Semaphore,ThreadExecutorPool都有应用。
condition实现主要依靠两层锁:
- condition初始化时创建一把锁(外部锁),使用时需要先对外部锁上锁
- 每次调用wait时,会先生成一个lock锁(内部锁),将内部锁放到算双端队列waiters中,
然后上锁,再将外部锁释放。并再次获取内部锁block,等待其他线程调用notify释放该内部锁,
然后该线程会去试图获取内部锁。