今天看了 《Python源码剖析》 中多线程的章节,看完头都秃了。但还是感觉自己没有吸收多少,知识一股脑倾泻下来,然后匆匆流过。在两个番茄钟的间隙,我全景思考了下,还是要从上往下入手,先将 threading 模块梳理清楚。其次是带着问题来阅读源码,跟着作者的思路容易迷,而问题最好是从 threading 的使用姿势中来。

来看廖雪峰老师Python教程中的例子:

import time, threading

def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

看完这段代码,我的脑海中浮现了这几个问题:

  1. Thread 是如何描述线程对象的?
  2. 启动线程的时候做了些啥,线程对象保存在哪里?
  3. 除了 Thread,threading 里还保存了写东西,否则 threading.current_thread() 怎么实现?

除此之外,还有一些线程的常见问题,如:

  1. threading 提供了哪些线程同步方式,锁?

Thread

Thread 类在单独线程中运行的操作,定义操作的方式有两种:

  1. 传入 Thread.__init__() 一个 callable 对象
  2. 子类中重写 run() 对象,其他方法不需要重写。

构造函数

def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None)

调用这个构造函数时,必需带有关键字参数。参数如下:

  • group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
  • target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
  • name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。
  • args 是用于调用目标函数的参数元组。默认是 ()。
  • kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
  • 如果 daemon 不是 None,线程将被显式的设置为 守护模式,不管该线程是否是守护模式。如果是 None (默认值),线程将继承当前线程的守护模式属性。

__init__ 的实现,就是一些简单的变量赋值和初始化操作,这其中包括把类变量 _initialized 置为 True

start()

start() 开始线程活动。它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

def start(self):

    # 必须先调用 __init__() 方法先
    if not self._initialized:
        raise RuntimeError("thread.__init__() not called")

    # 如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。
    if self._started.is_set():
        raise RuntimeError("threads can only be started once")

    if self.daemon and not _is_main_interpreter():
        raise RuntimeError("daemon thread are not supported "
                           "in subinterpreters")

    with _active_limbo_lock:
        _limbo[self] = self
    try:
        _start_new_thread(self._bootstrap, ())
    except Exception:
        with _active_limbo_lock:
            del _limbo[self]
        raise
    self._started.wait()

我们把用到的几个全局变量单独拎出来:

# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {}    # maps thread id to Thread object
_limbo = {}

先说下 _allocate_lock() 就对应的 thread 模块的 allocate_lock_start_new_thread 也指向 thread 模块的 start_new_thread

_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock

_limbo 中保存的是已创建但为运行的线程对象。在启动一个新线程中执行的 bootstrap()

def _bootstrap(self):
    try:
        self._bootstrap_inner()
    except:
        if self._daemonic and _sys is None:
            return
        raise

def _bootstrap_inner(self):
    try:
        self._set_ident()
        self._set_tstate_lock()

        # ...
        self._started.set()
        with _active_limbo_lock:
            _active[self._ident] = self
            del _limbo[self]

        # ...
        try:
            self.run()
        except:
            self._invoke_excepthook(self)
    finally:
        with _active_limbo_lock:
            try:
                del _active[get_ident()]
            except:
                pass

_bootstrap_inner() 干了这么几件事:

  • 设置新线程的线程标识符
  • 为新线程加锁,线程状态被删除之后,该锁会被解释器释放
  • 将事件标志位置为 True
  • 将线程对象从 _limbo 中移除,并添加到活跃线程 _active
  • 调用 run() 函数,执行 target 参数的来的可调用对象发起调用
def run(self):
    try:
        if self._target:
            self._target(*self._args, **self._kwargs)
    finally:
        # Avoid a refcycle if the thread is running a function with
        # an argument that has a member that points to the thread.
        del self._target, self._args, self._kwargs

join()

join(timeout=None)等待线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 - 不管是正常终结还是抛出未处理异常 – 或者直到发生超时。这里面起作用的是 _wait_for_tsate_lock() 函数,它等待被释放的锁,就是 start() 启动时加的那个。

def _wait_for_tstate_lock(self, block=True, timeout=-1):
    lock = self._tstate_lock
    if lock is None:  # already determined that the C code is done
        assert self._is_stopped
    elif lock.acquire(block, timeout):
        lock.release()
        self._stop()

这个锁是也涉及到 thread 模块的 _set_sentinel 函数,留待下回分解。

Event

Thread 类中使用到了事件对象,刚才被有意地跳过了。事件是线程间通信的机制之一,可能是最简单的机制:一个线程发出事件信号,其他线程则等待该信号。

Thread 的构造函数中,初始化了一个 Event 对象 _started

class Thread:
    def __init__(self, group=None, target=None, name=None,
                args=(), kwargs=None, *, daemon=None):
            # ...
            self._started = Event()

Event 实现事件对象的类。事件对象管理一个内部标志,调用 set() 方法可将其设置为 True。调用 clear() 方法可将其设置为 False。调用 wait() 方法将进入阻塞直到标志为 True。这个标志初始时为 False

class Event:

    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = Fals

    # 所有正在等待这个事件的线程将被唤醒。当标志为true时,调用 wait() 方法的线程不会被被阻塞。
    def set(self):
        with self._cond:
            self._flag = True
            self._cond.notify_all()

    # 之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标志再次设置为true。
    def clear(self):
        with self._cond:
            self._flag = False

    # 如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时。
    def wait(self, timeout=None):
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

Event 的实现主要依赖条件对象 Condition

Condition

一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。

Condition 类的构造函数接收 lock 参数,它必须为 Lock 或者 RLock 对象,并且它将被用作底层锁。否则,将会创建新的 RLock 对象,并将其用作底层锁。_waiters 定义了一个双向队列来保存等待线程。

class Condition:
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock

        # 请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。
        self.acquire = lock.acquire
        self.release = lock.release

        # ...
        self._waiters = _deque()

wait()

wait() 等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发 RuntimeError 异常。

这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify()notify_all() 唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。

class Condition:
    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")

        # 获取锁
        waiter = _allocate_lock()
        waiter.acquire()

        # 加入等待队列
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass    

notify()

notify(n=1) 默认唤醒一个等待这个条件的线程。这个函数会遍历等待列表 self._waiters,然后依次调用 release() 通知等待线程。

class Condition:
    def notify(self, n=1):
        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition
        variable; it is a no-op if no threads are waiting.

        """
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

总结

回到文章开头的问题,现在应该都有了答案。

Thread 是如何描述线程对象的?

除了构造函数的参数 grouptarget 等,还包括了锁 _tsstate_lock,事件 _started,初始化标志位 _initialized 等。

启动线程的时候做了些啥?

这里主要是 start() 完成的工作,包括设置标识符、加锁、设置事件标志、添加到活跃线程、调用 run() 函数等。

线程对象保存在哪里?

线程创建之后保存在 _limbo 数组中,start() 运行之后从 _limbo 中踢除,进入 _active 字典。

除了 Thread,threading 里还保存了写东西,否则 threading.current_thread() 怎么实现?

新建的线程都保存在 _active 字典中,根据线程描述符,可以直接拿到对应的线程对象。

threading 提供了哪些线程同步方式,锁?

threading 提供了 EventCondition 来实线程通信,归根结底还是依赖 thread 模块的锁。

参考