一、asyncio库的介绍
asyncio是一个异步并发库,从Py3.4之后开始加入这个内置并发库,目的是解决Python中高并发的问题。它提供了一整套异步IO模型的编程接口,可以说它不是一个简单的库更多的是一个框架。asyncio库的出现,使得python和go、Elixir语言在并发领域有竞争能力。
二、asyncio库的使用
举个简单的例子:我需要购买土豆,但是土豆数量有限,这个时候我们可以等待土豆上线再购买:
class Potato: @classmethod def make(cls, num): potatos = [] for i in range(num): potatos.append(cls.__new__(cls)) return potatos
all_potatos = Potato.make(2)
def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: time.sleep(.1) continue potato = all_potatos.pop() count += 1 yield potato if count == num: break
def buy_potatos(num): bucket = [] for p in take_potatos(num): bucket.append(p)
从上面看,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的,没办法补充土豆),也许可以用多进程和多线程解决,而在现实生活中,情况应该是当土豆没有了,我们向商家请求进货,然后再等待商家补充土豆供应后再购买,利用asyncio库实现如下:
class Potato: @classmethod def make(cls, num): potatos = [] for i in range(num): potatos.append(cls.__new__(cls)) return potatos all_potatos = Potato.make(2) async def ask_for_potatos(): await asyncio.sleep(2) all_potatos.extend(Potato.make(random.randint(1, 10))) async def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: await ask_for_potatos() potato = all_potatos.pop() count += 1 yield potato if count == num: break async def buy_potatos(num): bucket = [] async for p in take_potatos(num): bucket.append(p) print("add potato is %s, time is %s" % (id(p), datetime.datetime.now())) def main(): import asyncio loop = asyncio.get_event_loop() res = loop.run_until_complete(buy_potatos(12)) loop.close() if __name__ == '__main__': main()
接下来我们来简单解释下以上代码:
解释代码前先明确几个概念:
(1)使用async def 定义的普通函数,我们称为异步函数,也称为协程;使用async def 定义的生成器函数,我们也称为异步生成器函数;
(2)事件循坏loop调用的run_until_complete函数的参数必须为异步函数或者异步生成器函数;
(3)await关键字表示等待await 后面的程序返回执行结果,和yield from 作用类似(yield from作为一个委托生成器,会等待子生成器返回结果,返回的结果在异常的value值中,但是yield from会吃掉异常取出value 返回)。注意 await 关键字只能在async def定义的函数中使;
(4)async for语法表示我们要后面迭代的是一个异步生成器;
代码思路:
在购买土豆的时候(buy_potatos函数执行),如果没有土豆,请求并等待生产者生产(async for p in take_potatos(num),注意take_potatos返回的是一个生成器),生产结束(await ask_for_potatos()执行结束)后继续购买。这里和之前代码的区别在于多了一个请求生产者生产的过程。这里面要注意的一个地方就是await ask_for_potatos , 当代码执行到这条语句时会陷入等待。其实实际上等待的是await asyncio.sleep , 当然在等待过程中,可以运行其他的任务,比如我们想买番茄,这个时候可以加入买番茄的任务。代码变成:
def main(): loop = asyncio.get_event_loop() res = loop.run_until_complete(asyncio.wait([buy_potatos(12), buy_tomatos(15)])) loop.close()
此时运行结果变成:
add potato is 1928099938160, time is 2021-06-18 15:29:33.462255
add potato is 1928099938208, time is 2021-06-18 15:29:33.462255 #在当土豆购买完后,代码在运行await asyncio.sleep,也就是等待土豆补给过程中,会继续执行下面的购买番茄
add tomato is 1928099935280, time is 2021-06-18 15:29:33.462255
add tomato is 1928099935328, time is 2021-06-18 15:29:33.462255
add tomato is 1928099935472, time is 2021-06-18 15:29:33.462255 #在所有番茄购买完后,代码在运行await asyncio.sleep,也就是等待番茄补给过程
add potato is 1928132833488, time is 2021-06-18 15:29:35.463107 #在2s后,也就是土豆补给结束后,继续购买所有补给的5个土豆,然后又陷入等待补给过程中
add potato is 1928132833440, time is 2021-06-18 15:29:35.463107
add potato is 1928132833392, time is 2021-06-18 15:29:35.463107
add potato is 1928132749824, time is 2021-06-18 15:29:35.463107
add tomato is 1928132833776, time is 2021-06-18 15:29:35.463107
…
从运行结果可以看到,在等待土豆补给的2s内,还可以去购买番茄。这也就达到了异步的目的。
三、asyncio库源码解析
在分析源码前,推荐阅读一篇博客-深入理解Python异步编程,里面对一些异步编程的概念做了很好的概括。除了异步编程的概念,笔者也在这补充一些asyncio库源码阅读时牵涉到的一些基本概念:
(1)coro对象:协程,就是我们需要运行的业务代码。当代码运行到core协程中的await处时,GIL锁将控制权交给主循环,这个时候主循环会进行协程切换;
(2)future对象:未来对象,asyncio框架中定义的类对象,可以储存协程的执行结果,只有在捕获到stopIteration时,会将结果写入future中;
(3)task对象:继承自future类,最主要的作用就是触发协程往后执行(task类有个__step函数主要就是做这件事情);
(4)loop对象:事件循环,控制驱动整个协程的进行;
现在开始阅读源码:
在创建事件循环对象loop的时候,我们调用的是asyncio 库中的get_event_loop方法,返回的对象是asyncio.windows_events.ProactorEventLoop(windows系统)或者asyncio._UnixSelectorEventLoop(unix系统),但两者的基类均为BaseEventLoop,run_until_complete就是BaseEventLoop的方法,代码如下:
def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() self._check_running() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) # 关注点① if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) # 关注点② try: self.run_forever() # 关注点③ except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
① 这里的future参数就是我们传进去的协程,task.ensure_future就是创建一个协程对应的task任务(如果传过来的是future类对象就直接返回),在创建task对象的时候做了很多事情,
最需要关注的是self._loop.call_soon方法,这个方法主要是将task的__step函数封装成一个事件,加入到loop的已经准备好的事件列表self.ready中。
self._coro = coro self._context = contextvars.copy_context() self._loop.call_soon(self.__step, context=self._context)
接下来讲讲self.__step函数,源码如下(直接在源码上备注解析):
def __step(self, exc=None): if self.done(): #__step主要起一个协程触发的功能,如果协程已经结束或者取消了,这个时候报错 raise exceptions.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) # 在asyncio的tasks.py文件中有一个全局变量_current_tasks,用来记录不同loop下正在运行的task,这里就是给全局变量添加一条记录 # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) #这一步非常关键,协程的触发,使协程运行到下一个yield语句处 else: result = coro.throw(exc) #如果传参中带有异常,在yield语句处抛出 except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) #当协程结束后,会抛出StopIteration异常,其中异常的值就是返回值,将值取出,放入future中,调用set_result的时候也会调用结束回调函数 except exceptions.CancelledError as exc: # Save the original exception so we can chain it later. self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) raise except BaseException as exc: super().set_exception(exc) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) elif blocking: if result is self: new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: result._asyncio_future_blocking = False # result就是yield后面的值,_asyncio_future_blocking属于future类的类属性, coro协程中出现await future语句的时候,首先会将_asyncio_future_blocking标志位 置为true,代表协程阻塞。然后添加协程的结束回调函数。协程的结束回调往往通过在协程代码 中添加一个TimeHandler事件,该事件判断如果协程未结束,自动将协程结果写成none,并调用结束 回调 result.add_done_callback( self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel( msg=self._cancel_message): self._must_cancel = False else: new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) #这里面的call_soon函数,主要是添加下一个协程触发事件到loop的待执行事件集self.ready中 elif inspect.isgenerator(result): # Yielding a generator is just wrong. new_exc = RuntimeError( f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs.
从源码可以看出,初始化task的时候,会将协程触发事件self.__step放到loop的待执行事件中,当第一次协程触发事件结束后,如果该协程不抛异常或者结束,会将下一次协程触发事件加入到loop的待执行事件集中,
从而达到协程的不断触发执行,直到产生结果。这里面要注意的是,每次协程触发到下一次await或者yield处停顿,这个时候协程会切换到loop主循环(GIL锁进行切换),然后取下一个事件执行。
②future.add_done_callback(_run_until_complete_cb) :_run_until_complete_cb函数,在协程结束后,将loop主循环停止
③self.run_forever():不断循环调用事件的处理,这边因为在run_until_complete中调用的,所以在协程结束后会设置loop的stop状态,此时run_forever会停止
这个函数是事件循环的关键,源码如下:
def run_forever(self): """Run until stop() is called.""" self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: events._set_running_loop(self) while True: self._run_once() if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
其中最关键的步骤就是self._run_once,源码如下:
def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) #self._scheduled里面事件对象为定时事件,比如调用call_at或者call_later生成的定时事件 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. #定时事件都有一个属性_when,代表事件发生时间,如果超时会被取消 while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) event_list = self._selector.select(timeout) #取出selector中由IO触发的事件对应的回调函数,然后调用IO事件对应的回调函数 self._process_events(event_list) #在定时事件发生的时间点取出定时事件,将定时事件加入到self.ready队列中执行,如果定时事件时间还未到,不执行,需要注意的是,self._scheduled是一个最小二根堆,所以 self._scheduled[0]代表最先发生的定时事件。 # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) #下面这一片代码比较简单但是也比较重要,就是取出self._ready双向队列(FIFO)中的事件执行 # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.
以上的源码解析只是针对于单个协程在主事件循环中执行,回到上面的例子,如果在买土豆等待土豆补充过程中也进行西红柿的购买,代码:
res = loop.run_until_complete(asyncio.wait([buy_potatos(12), buy_tomatos(15)]))
首先来看下asyncio的wait方法,该方法为协程,因为run_until_complete方法使用协程函数作为参数。
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): #这里的fs是一个列表,代表并发的任务列表 """Wait for the Futures and coroutines given by fs to complete. The fs iterable must not be empty. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending). Usage: done, pending = await asyncio.wait(fs) Note: This does not raise TimeoutError! Futures that aren't done when the timeout occurs are returned in the second set. """ #参数校验 if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect a list of futures, not {type(fs).__name__}") if not fs: raise ValueError('Set of coroutines/Futures is empty.') if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: loop = events.get_running_loop() else: warnings.warn("The loop argument is deprecated since Python 3.8, " "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2) fs = set(fs) if any(coroutines.iscoroutine(f) for f in fs): warnings.warn("The explicit passing of coroutine objects to " "asyncio.wait() is deprecated since Python 3.8, and " "scheduled for removal in Python 3.11.", DeprecationWarning, stacklevel=2) fs = {ensure_future(f, loop=loop) for f in fs} #这句代码很关键,对于需要并发执行的协程列表,ensure_future其实就是生成一个task对象,每一个 task对象对应一个协程,在初始化task对象的时候,会将task.__step函数生成一个事件等待执行,具体 步骤和前面讲的单个协程执行一致 return await _wait(fs, timeout, return_when, loop)
fs = {ensure_future(f, loop=loop) for f in fs} 能够将所有协程生成任务对象,并在主事件循环中执行。剩下的就是属于协程控制的一部分了。
所以我们先来看_wait函数:
async def _wait(fs, timeout, return_when, loop): #可以看出,_wait的所有传参都是wait函数的传参,但是fs经过处理,变成task的集合,也可以看成协程对应的任务集合 """Internal helper for wait(). The fs argument must be a collection of Futures. """ assert fs, 'Set of Futures is empty.' waiter = loop.create_future() #这一步创建了和loop绑定的future对象 timeout_handle = None if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) #我们传参timeout,会创建一个定时事件,从现在起,经过timeout时间后,执行事件对应回调函数 #_release_waiter(waiter)。_release_waiter函数就是当future未结束时将future对象waiter的执行结果 # 置为none,这样调用waiter的结束回调函数。 counter = len(fs) def _on_completion(f): nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)): if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion) try: await waiter finally: if timeout_handle is not None: timeout_handle.cancel() for f in fs: f.remove_done_callback(_on_completion) done, pending = set(), set() for f in fs: if f.done(): done.add(f) else: pending.add(f) return done, pending
有一个比较难理解的点需要说明下,我们的time_handle事件执行后,会调用waiter的结束回调函数,结束回调函数是在什么时候添加的呢?这个得从头说起:
一开始的时候,asyncio.wait()作为run_until_complete方法的参数,其实就会将asyncio.wait()作为协程,生成对应的task对象,并将触发协程的task.__step事件添加到self.ready中。
loop事件循环取到第一个task.__step事件执行的时候,wait函数代码执行到第一个await waiter处,也就是执行到future的__await__方法,代码如下:
def __await__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.
此时会将 self._asyncio_future_blocking 标志位置为 True。这个时候我们看task.__step的处理,见注释:
try: if exc is None: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) #这里的result就是yield self 后面的self的值,即result的值为绑定loop的future对象 else: result = coro.throw(exc) except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) except exceptions.CancelledError as exc: # Save the original exception so we can chain it later. self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) raise except BaseException as exc: super().set_exception(exc) else: blocking = getattr(result, '_asyncio_future_blocking', None) #很显然,future的_asyncio_future_blocking存在,且置为true if blocking is not None: # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) elif blocking: if result is self: #这里的self指的是task对象,和我们的绑定loop的future对象不等 new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: result._asyncio_future_blocking = False #这段代码就是我们最终走进去的,可以看到,这块给我们的waiter加了结束回调函数,另外还需要注意的是 这块代码没有加 self._loop.call_soon这段代码,这代表wait对应的task其实会一致停留在yield处,已经 没有下一次wait协程的触发了。但这并不影响我们wait传参中的协程列表执行。 result.add_done_callback( self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel( msg=self._cancel_message): self._must_cancel = False
如果wait对应的task阻塞,其实并没有多大影响, 因为我们传进去的协程列表买土豆,买西红柿还在正常执行。如果我们传参中加了timeout这个参数,那么按照前面的分析,loop绑定的future的结束回调会执行,也就是这里的self.__wakeup函数执行:
def __wakeup(self, future): try: future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: self.__step() self = None # Needed to break cycles when an exception occurs.
可以看到,wait对应的task会继续执行,也就是从阻塞状态变成唤醒,因为执行了协程触发函数self.__step()。唤醒后,_wait继续往下执行:
finally: if timeout_handle is not None: timeout_handle.cancel() for f in fs: f.remove_done_callback(_on_completion) done, pending = set(), set() for f in fs: if f.done(): done.add(f) else: pending.add(f) return done, pending
这个时候,wait对应的task对象(属于future的子类)结果里面保存了当前买土豆买西红柿这两个协程执行的情况。
难道wait对应的task对象不会对传进去的协程列表造成影响吗?非也。
wait对应的task对象起了一个整体控制的作用,如果wait对应的task对象结束了,也就是task对象里面保存了当前买土豆买西红柿这两个协程执行的情况后,会调用结束回调函数:
_run_until_complete_cb, 前面讲了,这个回调会时整个事件循环loop结束。
讲到这里,可以讲下wait函数的另一个传参return_when,这个传参里面可以选择FIRST_COMPLETED 、FIRST_EXCEPTION和ALL_COMPLETE(默认)三个。比如我们选择FIRST_COMPLETED ,在购买土豆和西红柿中任意一个协程结束后,调用waiter的set_result,按照之前的分析,wait对应的task会被唤醒,然后整个事件循环loop状态变成stop,事件循环结束。
def _on_completion(f): nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)): if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion)
综上,timeout代表事件循环loop在timeout后会终止。我们可以看到asyncio利用协程完成了任务的并发,同时也添加了任务的控制。既没有进程或者线程的切换开销,又能够实现并发的控制。
原文地址:http://www.cnblogs.com/kevin-zsq/p/16811132.html