一、asyncio库的介绍  

  asyncio是一个异步并发库,从Py3.4之后开始加入这个内置并发库,目的是解决Python中高并发的问题。它提供了一整套异步IO模型的编程接口,可以说它不是一个简单的库更多的是一个框架。asyncio库的出现,使得python和go、Elixir语言在并发领域有竞争能力。

二、asyncio库的使用 

  举个简单的例子:我需要购买土豆,但是土豆数量有限,这个时候我们可以等待土豆上线再购买:

class Potato:
    @classmethod
    def make(cls, num):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls))
        return potatos

all_potatos
= Potato.make(2)
def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: time.sleep(.1) continue potato = all_potatos.pop() count += 1 yield potato if count == num: break
def buy_potatos(num): bucket = [] for p in take_potatos(num): bucket.append(p)

  从上面看,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的,没办法补充土豆),也许可以用多进程和多线程解决,而在现实生活中,情况应该是当土豆没有了,我们向商家请求进货,然后再等待商家补充土豆供应后再购买,利用asyncio库实现如下:

class Potato:
    @classmethod
    def make(cls, num):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls))
        return potatos


all_potatos = Potato.make(2)


async def ask_for_potatos():
    await asyncio.sleep(2)
    all_potatos.extend(Potato.make(random.randint(1, 10)))


async def take_potatos(num):
   count = 0
    while True:
        if len(all_potatos) == 0:
            await ask_for_potatos()
        potato = all_potatos.pop()
        count += 1
            yield potato
        if count == num:
            break


async def buy_potatos(num):
    bucket = []
    async for p in take_potatos(num):
        bucket.append(p)
        print("add potato is %s, time is %s" % (id(p), datetime.datetime.now()))


def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(buy_potatos(12))
    loop.close()


if __name__ == '__main__':
    main()

接下来我们来简单解释下以上代码:

解释代码前先明确几个概念:

(1)使用async def 定义的普通函数,我们称为异步函数,也称为协程;使用async def 定义的生成器函数,我们也称为异步生成器函数;

(2)事件循坏loop调用的run_until_complete函数的参数必须为异步函数或者异步生成器函数;

(3)await关键字表示等待await 后面的程序返回执行结果,和yield from 作用类似(yield from作为一个委托生成器,会等待子生成器返回结果,返回的结果在异常的value值中,但是yield from会吃掉异常取出value 返回)。注意 await 关键字只能在async def定义的函数中使;

(4)async for语法表示我们要后面迭代的是一个异步生成器;

代码思路:

在购买土豆的时候(buy_potatos函数执行),如果没有土豆,请求并等待生产者生产(async for p in take_potatos(num),注意take_potatos返回的是一个生成器),生产结束(await ask_for_potatos()执行结束)后继续购买。这里和之前代码的区别在于多了一个请求生产者生产的过程。这里面要注意的一个地方就是await ask_for_potatos , 当代码执行到这条语句时会陷入等待。其实实际上等待的是await asyncio.sleep , 当然在等待过程中,可以运行其他的任务,比如我们想买番茄,这个时候可以加入买番茄的任务。代码变成:

def main():
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(asyncio.wait([buy_potatos(12), buy_tomatos(15)]))
    loop.close()

此时运行结果变成:

add potato is 1928099938160, time is 2021-06-18 15:29:33.462255

add potato is 1928099938208, time is 2021-06-18 15:29:33.462255            #在当土豆购买完后,代码在运行await asyncio.sleep,也就是等待土豆补给过程中,会继续执行下面的购买番茄

add tomato is 1928099935280, time is 2021-06-18 15:29:33.462255

add tomato is 1928099935328, time is 2021-06-18 15:29:33.462255

add tomato is 1928099935472, time is 2021-06-18 15:29:33.462255           #在所有番茄购买完后,代码在运行await asyncio.sleep,也就是等待番茄补给过程

add potato is 1928132833488, time is 2021-06-18 15:29:35.463107            #在2s后,也就是土豆补给结束后,继续购买所有补给的5个土豆,然后又陷入等待补给过程中

add potato is 1928132833440, time is 2021-06-18 15:29:35.463107            

add potato is 1928132833392, time is 2021-06-18 15:29:35.463107

add potato is 1928132749824, time is 2021-06-18 15:29:35.463107

add tomato is 1928132833776, time is 2021-06-18 15:29:35.463107

从运行结果可以看到,在等待土豆补给的2s内,还可以去购买番茄。这也就达到了异步的目的。

三、asyncio库源码解析

在分析源码前,推荐阅读一篇博客-深入理解Python异步编程,里面对一些异步编程的概念做了很好的概括。除了异步编程的概念,笔者也在这补充一些asyncio库源码阅读时牵涉到的一些基本概念:

(1)coro对象:协程,就是我们需要运行的业务代码。当代码运行到core协程中的await处时,GIL锁将控制权交给主循环,这个时候主循环会进行协程切换;

(2)future对象:未来对象,asyncio框架中定义的类对象,可以储存协程的执行结果,只有在捕获到stopIteration时,会将结果写入future中;

(3)task对象:继承自future类,最主要的作用就是触发协程往后执行(task类有个__step函数主要就是做这件事情);

(4)loop对象:事件循环,控制驱动整个协程的进行;

现在开始阅读源码:

在创建事件循环对象loop的时候,我们调用的是asyncio 库中的get_event_loop方法,返回的对象是asyncio.windows_events.ProactorEventLoop(windows系统)或者asyncio._UnixSelectorEventLoop(unix系统),但两者的基类均为BaseEventLoop,run_until_complete就是BaseEventLoop的方法,代码如下:

def run_until_complete(self, future):
    """Run until the Future is done.
    If the argument is a coroutine, it is wrapped in a Task.
    WARNING: It would be disastrous to call run_until_complete()
    with the same coroutine twice -- it would wrap it in two
    different Tasks and that can't be good.
    Return the Future's result, or raise its exception.
    """
    self._check_closed()
    self._check_running()

    new_task = not futures.isfuture(future)
    future = tasks.ensure_future(future, loop=self)     # 关注点①
    if new_task:
        # An exception is raised if the future didn't complete, so there
        # is no need to log the "destroy pending task" message
        future._log_destroy_pending = False

    future.add_done_callback(_run_until_complete_cb)        #  关注点②
    try:
        self.run_forever()        # 关注点③
    except:
        if new_task and future.done() and not future.cancelled():
            # The coroutine raised a BaseException. Consume the exception
            # to not log a warning, the caller doesn't have access to the
            # local task.
            future.exception()
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)
    if not future.done():
        raise RuntimeError('Event loop stopped before Future completed.')

    return future.result()

① 这里的future参数就是我们传进去的协程,task.ensure_future就是创建一个协程对应的task任务(如果传过来的是future类对象就直接返回),在创建task对象的时候做了很多事情,

最需要关注的是self._loop.call_soon方法,这个方法主要是将task的__step函数封装成一个事件,加入到loop的已经准备好的事件列表self.ready中。

self._coro = coro
self._context = contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)

接下来讲讲self.__step函数,源码如下(直接在源码上备注解析):

def __step(self, exc=None):
    if self.done():                               #__step主要起一个协程触发的功能,如果协程已经结束或者取消了,这个时候报错
        raise exceptions.InvalidStateError(
            f'_step(): already done: {self!r}, {exc!r}')
    if self._must_cancel:
        if not isinstance(exc, exceptions.CancelledError):
            exc = self._make_cancelled_error()
        self._must_cancel = False
    coro = self._coro
    self._fut_waiter = None

    _enter_task(self._loop, self)         # 在asyncio的tasks.py文件中有一个全局变量_current_tasks,用来记录不同loop下正在运行的task,这里就是给全局变量添加一条记录
    # Call either coro.throw(exc) or coro.send(None).
    try:
        if exc is None:
            # We use the `send` method directly, because coroutines
            # don't have `__iter__` and `__next__` methods.
            result = coro.send(None)         #这一步非常关键,协程的触发,使协程运行到下一个yield语句处
        else:
            result = coro.throw(exc)        #如果传参中带有异常,在yield语句处抛出
    except StopIteration as exc:
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            super().cancel(msg=self._cancel_message)
        else:
          super().set_result(exc.value)    #当协程结束后,会抛出StopIteration异常,其中异常的值就是返回值,将值取出,放入future中,调用set_result的时候也会调用结束回调函数
    except exceptions.CancelledError as exc:
        # Save the original exception so we can chain it later.
        self._cancelled_exc = exc
        super().cancel()  # I.e., Future.cancel(self).
    except (KeyboardInterrupt, SystemExit) as exc:
        super().set_exception(exc)
        raise
    except BaseException as exc:
        super().set_exception(exc)
    else:
        blocking = getattr(result, '_asyncio_future_blocking', None)
        if blocking is not None:
            # Yielded Future must come from Future.__iter__().
            if futures._get_loop(result) is not self._loop:
                new_exc = RuntimeError(
                    f'Task {self!r} got Future '
                    f'{result!r} attached to a different loop')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)    
            elif blocking:
                if result is self:
                    new_exc = RuntimeError(
                        f'Task cannot await on itself: {self!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                else:
                    result._asyncio_future_blocking = False        # result就是yield后面的值,_asyncio_future_blocking属于future类的类属性,
                                                             coro协程中出现await future语句的时候,首先会将_asyncio_future_blocking标志位
                                                             置为true,代表协程阻塞。然后添加协程的结束回调函数。协程的结束回调往往通过在协程代码
                                                             中添加一个TimeHandler事件,该事件判断如果协程未结束,自动将协程结果写成none,并调用结束
                                                             回调                                                                          
                            result.add_done_callback(
                        self.__wakeup, context=self._context)
                    self._fut_waiter = result
                    if self._must_cancel:
                        if self._fut_waiter.cancel(
                                msg=self._cancel_message):
                            self._must_cancel = False
            else:
                new_exc = RuntimeError(
                    f'yield was used instead of yield from '
                    f'in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)

        elif result is None:
            # Bare yield relinquishes control for one event loop iteration.
            self._loop.call_soon(self.__step, context=self._context)         #这里面的call_soon函数,主要是添加下一个协程触发事件到loop的待执行事件集self.ready中
        elif inspect.isgenerator(result):
            # Yielding a generator is just wrong.
            new_exc = RuntimeError(
                f'yield was used instead of yield from for '
                f'generator in task {self!r} with {result!r}')
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)
        else:
            # Yielding something else is an error.
            new_exc = RuntimeError(f'Task got bad yield: {result!r}')
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)
    finally:
        _leave_task(self._loop, self)
        self = None  # Needed to break cycles when an exception occurs.

从源码可以看出,初始化task的时候,会将协程触发事件self.__step放到loop的待执行事件中,当第一次协程触发事件结束后,如果该协程不抛异常或者结束,会将下一次协程触发事件加入到loop的待执行事件集中,

从而达到协程的不断触发执行,直到产生结果。这里面要注意的是,每次协程触发到下一次await或者yield处停顿,这个时候协程会切换到loop主循环(GIL锁进行切换),然后取下一个事件执行。

②future.add_done_callback(_run_until_complete_cb) :_run_until_complete_cb函数,在协程结束后,将loop主循环停止

③self.run_forever():不断循环调用事件的处理,这边因为在run_until_complete中调用的,所以在协程结束后会设置loop的stop状态,此时run_forever会停止

这个函数是事件循环的关键,源码如下:

def run_forever(self):
    """Run until stop() is called."""
    self._check_closed()
    self._check_running()
    self._set_coroutine_origin_tracking(self._debug)
    self._thread_id = threading.get_ident()

    old_agen_hooks = sys.get_asyncgen_hooks()
    sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                           finalizer=self._asyncgen_finalizer_hook)
    try:
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break
    finally:
        self._stopping = False
        self._thread_id = None
        events._set_running_loop(None)
        self._set_coroutine_origin_tracking(False)
        sys.set_asyncgen_hooks(*old_agen_hooks)

其中最关键的步骤就是self._run_once,源码如下:

def _run_once(self):
    """Run one full iteration of the event loop.

    This calls all currently ready callbacks, polls for I/O,
    schedules the resulting callbacks, and finally schedules
    'call_later' callbacks.
    """

    sched_count = len(self._scheduled)                          #self._scheduled里面事件对象为定时事件,比如调用call_at或者call_later生成的定时事件
    if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
        self._timer_cancelled_count / sched_count >
            _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
        # Remove delayed calls that were cancelled if their number
        # is too high
        new_scheduled = []
        for handle in self._scheduled:
            if handle._cancelled:
                handle._scheduled = False
            else:
                new_scheduled.append(handle)

        heapq.heapify(new_scheduled)
        self._scheduled = new_scheduled
        self._timer_cancelled_count = 0
    else:
        # Remove delayed calls that were cancelled from head of queue.  #定时事件都有一个属性_when,代表事件发生时间,如果超时会被取消
        while self._scheduled and self._scheduled[0]._cancelled:
            self._timer_cancelled_count -= 1
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False

    timeout = None
    if self._ready or self._stopping:
        timeout = 0
    elif self._scheduled:
        # Compute the desired timeout.
        when = self._scheduled[0]._when
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

    event_list = self._selector.select(timeout)     #取出selector中由IO触发的事件对应的回调函数,然后调用IO事件对应的回调函数     
    self._process_events(event_list)

    #在定时事件发生的时间点取出定时事件,将定时事件加入到self.ready队列中执行,如果定时事件时间还未到,不执行,需要注意的是,self._scheduled是一个最小二根堆,所以
    self._scheduled[0]代表最先发生的定时事件。
    # Handle 'later' callbacks that are ready.
    end_time = self.time() + self._clock_resolution
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
      handle._scheduled = False
        self._ready.append(handle)

            
    #下面这一片代码比较简单但是也比较重要,就是取出self._ready双向队列(FIFO)中的事件执行
   # This is the only place where callbacks are actually *called*.
    # All other places just add them to ready.
    # Note: We run all currently scheduled callbacks, but not any
    # callbacks scheduled by callbacks run this time around --
    # they will be run the next time (after another I/O poll).
    # Use an idiom that is thread-safe without using locks.
    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        if self._debug:
            try:
                self._current_handle = handle
                t0 = self.time()
                handle._run()
                dt = self.time() - t0
                if dt >= self.slow_callback_duration:
                    logger.warning('Executing %s took %.3f seconds',
                                   _format_handle(handle), dt)
            finally:
                self._current_handle = None
        else:
            handle._run()
    handle = None  # Needed to break cycles when an exception occurs.

以上的源码解析只是针对于单个协程在主事件循环中执行,回到上面的例子,如果在买土豆等待土豆补充过程中也进行西红柿的购买,代码:

res = loop.run_until_complete(asyncio.wait([buy_potatos(12), buy_tomatos(15)]))

首先来看下asyncio的wait方法,该方法为协程,因为run_until_complete方法使用协程函数作为参数。

async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):      #这里的fs是一个列表,代表并发的任务列表
    """Wait for the Futures and coroutines given by fs to complete.

    The fs iterable must not be empty.

    Coroutines will be wrapped in Tasks.

    Returns two sets of Future: (done, pending).

    Usage:

        done, pending = await asyncio.wait(fs)

    Note: This does not raise TimeoutError! Futures that aren't done
    when the timeout occurs are returned in the second set.
    """
    #参数校验
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    if not fs:
        raise ValueError('Set of coroutines/Futures is empty.')
    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
        raise ValueError(f'Invalid return_when value: {return_when}')

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    fs = set(fs)

    if any(coroutines.iscoroutine(f) for f in fs):
        warnings.warn("The explicit passing of coroutine objects to "
                      "asyncio.wait() is deprecated since Python 3.8, and "
                      "scheduled for removal in Python 3.11.",
                      DeprecationWarning, stacklevel=2)

    fs = {ensure_future(f, loop=loop) for f in fs}                #这句代码很关键,对于需要并发执行的协程列表,ensure_future其实就是生成一个task对象,每一个
                                                           task对象对应一个协程,在初始化task对象的时候,会将task.__step函数生成一个事件等待执行,具体
                                                           步骤和前面讲的单个协程执行一致

    return await _wait(fs, timeout, return_when, loop)

fs = {ensure_future(f, loop=loop) for f in fs} 能够将所有协程生成任务对象,并在主事件循环中执行。剩下的就是属于协程控制的一部分了。

所以我们先来看_wait函数:

async def _wait(fs, timeout, return_when, loop):  #可以看出,_wait的所有传参都是wait函数的传参,但是fs经过处理,变成task的集合,也可以看成协程对应的任务集合
    """Internal helper for wait().

    The fs argument must be a collection of Futures.
    """
    assert fs, 'Set of Futures is empty.'
    waiter = loop.create_future()                  #这一步创建了和loop绑定的future对象
    timeout_handle = None
    if timeout is not None:
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)      #我们传参timeout,会创建一个定时事件,从现在起,经过timeout时间后,执行事件对应回调函数
                                                                    #_release_waiter(waiter)。_release_waiter函数就是当future未结束时将future对象waiter的执行结果
                                                                     # 置为none,这样调用waiter的结束回调函数。
    counter = len(fs)

    def _on_completion(f):
        nonlocal counter
        counter -= 1
        if (counter <= 0 or
            return_when == FIRST_COMPLETED or
            return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)):
            if timeout_handle is not None:
                timeout_handle.cancel()
            if not waiter.done():
                waiter.set_result(None)

    for f in fs:
        f.add_done_callback(_on_completion)

    try:
        await waiter
    finally:
        if timeout_handle is not None:
            timeout_handle.cancel()
        for f in fs:
            f.remove_done_callback(_on_completion)

    done, pending = set(), set()
    for f in fs:
        if f.done():
            done.add(f)
        else:
            pending.add(f)
    return done, pending

有一个比较难理解的点需要说明下,我们的time_handle事件执行后,会调用waiter的结束回调函数,结束回调函数是在什么时候添加的呢?这个得从头说起:

一开始的时候,asyncio.wait()作为run_until_complete方法的参数,其实就会将asyncio.wait()作为协程,生成对应的task对象,并将触发协程的task.__step事件添加到self.ready中。

loop事件循环取到第一个task.__step事件执行的时候,wait函数代码执行到第一个await waiter处,也就是执行到future的__await__方法,代码如下:

def __await__(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self  # This tells Task to wait for completion.
    if not self.done():
        raise RuntimeError("await wasn't used with future")
    return self.result()  # May raise too.

__iter__ = __await__  # make compatible with 'yield from'.

此时会将 self._asyncio_future_blocking 标志位置为 True。这个时候我们看task.__step的处理,见注释:

try:
    if exc is None:
        # We use the `send` method directly, because coroutines
        # don't have `__iter__` and `__next__` methods.
        result = coro.send(None)                          #这里的result就是yield self 后面的self的值,即result的值为绑定loop的future对象
    else:
        result = coro.throw(exc)
except StopIteration as exc:
    if self._must_cancel:
        # Task is cancelled right before coro stops.
        self._must_cancel = False
        super().cancel(msg=self._cancel_message)
    else:
        super().set_result(exc.value)
except exceptions.CancelledError as exc:
    # Save the original exception so we can chain it later.
    self._cancelled_exc = exc
    super().cancel()  # I.e., Future.cancel(self).
except (KeyboardInterrupt, SystemExit) as exc:
    super().set_exception(exc)
    raise
except BaseException as exc:
    super().set_exception(exc)
else:
    blocking = getattr(result, '_asyncio_future_blocking', None)   #很显然,future的_asyncio_future_blocking存在,且置为true
    if blocking is not None:
        # Yielded Future must come from Future.__iter__().
        if futures._get_loop(result) is not self._loop:
            new_exc = RuntimeError(
                f'Task {self!r} got Future '
                f'{result!r} attached to a different loop')
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)
        elif blocking:
            if result is self:                                    #这里的self指的是task对象,和我们的绑定loop的future对象不等
                new_exc = RuntimeError(
                    f'Task cannot await on itself: {self!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                result._asyncio_future_blocking = False        #这段代码就是我们最终走进去的,可以看到,这块给我们的waiter加了结束回调函数,另外还需要注意的是
                                                        这块代码没有加 self._loop.call_soon这段代码,这代表wait对应的task其实会一致停留在yield处,已经
                                                        没有下一次wait协程的触发了。但这并不影响我们wait传参中的协程列表执行。
                result.add_done_callback(
                    self.__wakeup, context=self._context)
                self._fut_waiter = result
                if self._must_cancel:
                    if self._fut_waiter.cancel(
                            msg=self._cancel_message):
                        self._must_cancel = False

如果wait对应的task阻塞,其实并没有多大影响, 因为我们传进去的协程列表买土豆,买西红柿还在正常执行。如果我们传参中加了timeout这个参数,那么按照前面的分析,loop绑定的future的结束回调会执行,也就是这里的self.__wakeup函数执行:

def __wakeup(self, future):
    try:
        future.result()
    except BaseException as exc:
        # This may also be a cancellation.
        self.__step(exc)
    else:
        self.__step()
    self = None  # Needed to break cycles when an exception occurs.

可以看到,wait对应的task会继续执行,也就是从阻塞状态变成唤醒,因为执行了协程触发函数self.__step()。唤醒后,_wait继续往下执行:

finally:
        if timeout_handle is not None:
            timeout_handle.cancel()
        for f in fs:
            f.remove_done_callback(_on_completion)

    done, pending = set(), set()
    for f in fs:
        if f.done():
            done.add(f)
        else:
            pending.add(f)
    return done, pending

这个时候,wait对应的task对象(属于future的子类)结果里面保存了当前买土豆买西红柿这两个协程执行的情况。

难道wait对应的task对象不会对传进去的协程列表造成影响吗?非也。

wait对应的task对象起了一个整体控制的作用,如果wait对应的task对象结束了,也就是task对象里面保存了当前买土豆买西红柿这两个协程执行的情况后,会调用结束回调函数:

_run_until_complete_cb, 前面讲了,这个回调会时整个事件循环loop结束。

讲到这里,可以讲下wait函数的另一个传参return_when,这个传参里面可以选择FIRST_COMPLETED 、FIRST_EXCEPTION和ALL_COMPLETE(默认)三个。比如我们选择FIRST_COMPLETED ,在购买土豆和西红柿中任意一个协程结束后,调用waiter的set_result,按照之前的分析,wait对应的task会被唤醒,然后整个事件循环loop状态变成stop,事件循环结束。

def _on_completion(f):
        nonlocal counter
        counter -= 1
        if (counter <= 0 or
            return_when == FIRST_COMPLETED or
            return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)):
            if timeout_handle is not None:
                timeout_handle.cancel()
            if not waiter.done():
                waiter.set_result(None)

    for f in fs:
        f.add_done_callback(_on_completion)

综上,timeout代表事件循环loop在timeout后会终止。我们可以看到asyncio利用协程完成了任务的并发,同时也添加了任务的控制。既没有进程或者线程的切换开销,又能够实现并发的控制。

原文地址:http://www.cnblogs.com/kevin-zsq/p/16811132.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性