python异步编程实战,深入理解python异步编程

  python异步编程实战,深入理解python异步编程

  Yyds干货库存

  Python 3.5引入了两个新的关键字:async和await。这些看似神奇的关键词,完全可以在没有任何线程的情况下实现线程式并发。在本教程中,我们将介绍异步编程的原因,并通过构建我们自己的小型异步类框架来解释Python的async/await关键字如何在内部工作。

  为什么要异步编程?要理解异步编程的动机,我们必须首先理解是什么限制了我们代码的速度。理想情况下,我们希望我们的代码以光速运行,并立即跳过我们的代码,没有任何延迟。然而,由于两个因素,代码实际上运行得要慢得多:

  CPU时间(处理器执行指令的时间)IO时间(等待网络请求或存储读/写的时间)当我们的代码在等待IO时,CPU基本上是空闲的,等待某个外部设备响应。通常,内核会检测到这一点,并立即切换到执行系统中的其他线程。因此,如果我们希望加快一组IO密集型任务的处理速度,我们可以为每个任务创建一个线程。当其中一个线程停止等待IO时,内核会切换到另一个线程继续处理。

  这在实践中效果很好,但是它有两个缺点:

  存在线程开销(尤其是在Python中)。我们无法控制内核何时选择在线程间切换。例如,如果我们想要执行10,000个任务,我们要么必须创建10,000个线程,这将占用大量RAM,要么我们需要创建较少数量的工作线程,并以较少的并发性执行任务。此外,这些线程的初始生成将占用CPU时间。

  因为内核可以随时选择在线程之间切换,所以我们的代码随时都可能存在相互竞争的情况。

  在基于同步线程的传统代码中引入异步,内核必须检测线程何时被IO绑定,并选择随意在线程之间切换。使用Python异步,程序员使用关键字await来确认声明IO绑定的代码行,并确认被授予执行其他任务的权限。例如,考虑以下执行Web请求的代码:

  异步定义请求_google():

  reader,writer=await asyncio . open _ connection( Google . com ,80)

  writer.write(bGET/HTTP/2\n\n )

  等待writer.drain()

  response=await reader.read()

  Return response.decode()在这里,在这里,我们看到代码在两个地方等待。所以在等待我们的字节发送到服务器(writer.drain())的同时,在等待服务器回复一些字节(reader.read())的同时,我们知道其他代码可能会执行,全局变量可能会改变。但是,从函数开始到第一次等待,我们可以保证代码逐行运行,而不会切换到正在运行的程序中的其他代码。这就是异步的美妙之处。

  Asyncio是一个标准库,允许我们用这些异步函数做一些有趣的事情。例如,如果我们想同时向Google发出两个请求,我们可以:

  异步定义请求_google_twice():

  response_1,response _ 2=await asyncio . gather(request_google(),request _ Google())

  返回response_1,response_2当我们调用request_google_twice()时,神奇的asyncio.gather会启动一个函数调用,但当我们调用await writer.drain()时,它会启动第二个函数调用,这样两个请求就并行发生了。然后,它等待第一个或第二个请求的writer.drain()调用完成,并继续执行该函数。

  最后,忽略了一个重要的细节:asyncio.run为了从常规[同步] Python函数中实际调用异步函数,我们将调用包装在asyncio.run(.):

  异步定义async_main():

  r1,r2=await request_google_twice()

  打印(回答一:,r1)

  打印(响应二:,r2)

  返回12

  return _ val=async io . run(async_main())请注意,如果我们只调用async _ main()而不调用await.或者asyncio.run(.),什么都不会发生。这仅仅受到异步工作模式的性质的限制。

  那么,异步是如何工作的,这些神奇的asyncio.run和asyncio.gather函数的作用是什么?详情请阅读下文。

  异步是如何工作的?要理解async的神奇之处,我们首先需要了解一个更简单的Python构造:generator(前面《生成器和协程》,如果你没看过,可以去我的主页看看这篇文章。回来学这个就容易了)。

  生成器是一个Python函数,它逐个返回一系列值(迭代)。例如:

  def get_numbers()。

  打印( get_numbers begin )

  print( get_numbers给出1 . )

  产量1

  print( get_numbers给出2 . )

  产量2

  print( get_numbers给出3 . )

  产量3

  打印( get_numbers end )

  打印( 表示开始)

  对于get_numbers()中的数字:

  print(f“ Got { number }”。)

  打印( 表示结束)表示开始

  获取编号开始

   get_numbers给1.

  得到了1。

   get_numbers给2.

  得到了2。

   get_numbers给3.

  得了3。

  获取号码结束

   for end因此,我们看到,对于for循环的每次迭代,我们只在生成器中执行一次。我们可以使用Python的next()函数更明确地执行这种迭代:

  In [3]: generator=get_numbers()

  在[4]:下一个(生成器)

  获取编号开始

   get_numbers给1.

  Out[4]: 1

  在[5]:下一个(生成器)

   get_numbers给2.

  Out[5]: 2

  在[6]:下一个(生成器)

   get_numbers给3.

  Out[6]: 3

  在[7]:下一个(生成器)

  获取号码结束

  -

  StopIteration Traceback(最近一次调用)

  模块中的ipython-input-154-323ce5d717bb

  - 1下一个(发电机)

  StopIteration:这非常类似于异步函数的行为。就像异步函数从函数开始一直执行代码到第一次等待一样,当我们第一次调用next()时,生成器会从函数的顶部执行到第一条yield语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但是返回不同的内容,使用生成器创建类似异步的函数。

  使用生成器异步让我们使用生成器来创建我们自己的小型异步框架。

  然而,为了简单起见,让我们用sleep(即time.sleep)来代替实际的IO。让我们考虑一个需要定期发送更新的应用程序:

  def send_updates(计数:int,间隔_秒:float):

  对于范围内的I(1,计数1):

  time.sleep(间隔_秒)

  Print ([{}] SendingUpdate {}/{} . 。格式(interval _ seconds,I,count))。因此,如果我们调用send_updates(3,1.0),它将输出这三条消息,每条消息间隔1秒钟:

  [1.0]发送更新1/3。

  [1.0]发送更新2/3。

  [1.0]发送更新3/3。现在,假设我们想同时运行几个不同的时间间隔。比如send_updates(10,1.0),send_updates(5,2.0)和send_updates(4,3.0)。我们可以使用线程来做到这一点,如下所示:

  线程=[

  穿线。Thread(target=send_updates,args=(10,1.0))

  穿线。线程(target=send_updates,args=(5,2.0)),

  穿线。线程(target=send_updates,args=(4,3.0))

  ]

  对于螺纹中的I:

  i.start()

  对于螺纹中的I:

  I.join()这个是可行的,大概12秒就完成了,但是使用的线程有前面提到的缺点。让我们用发电机来建造同样的东西。

  在演示生成器的例子中,我们返回了一个整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述IO等待的对象。在我们的例子中,我们的“IO”只是一个计时器,它会等待一段时间。因此,让我们为此创建一个计时器对象:

  类异步计时器:

  def __init__(self,duration: float):

  self . done _ time=time . time()duration现在,让我们从我们的函数中生成它,而不是调用time.sleep:

  def send_updates(计数:int,间隔_秒:float):

  对于范围内的I(1,计数1):

  产出异步计时器(间隔_秒)

  Print ([{}] SendingUpdate {}/{} . 。Format (interval _ seconds,I,count))现在,每次我们调用next(.)当我们调用send_updates(.),我们得到一个AsyncTimer对象,它告诉我们应该等待到什么时候:

  generator=send_updates(3,1.5)

  timer=next(generator) # [1.5]发送更新1/3。

  print(timer . done _ time-time . time())# 1.498.因为我们的代码实际上并不调用time.sleep now,所以我们现在可以同时执行另一个send_updates调用。

  因此,为了将所有这些放在一起,我们需要后退一步,认识到一些事情:

  生成器就像一个部分执行的函数,等待一些IO(定时器)。每个部分执行的函数都有一些IO(定时器),在继续执行之前等待。因此,我们程序的当前状态是由每个部分执行的成对函数(生成器)和函数正在等待的IO(定时器)的列表。现在,要运行我们的程序,我们只需要等到一个IO准备好了(也就是我们的一个定时器超时了),然后向前一步执行相应的函数,得到一个阻塞该函数的新IO。这个实现逻辑为我们提供了以下信息:

  #用定时器0初始化每个生成器,以便它立即执行

  generator_timer_pairs=[

  (send_updates(10,1.0),AsyncTimer(0)),

  (send_updates(5,2.0),AsyncTimer(0)),

  (发送更新(4,3.0),异步计时器(0))

  ]

  而发电机计时器对:

  pair=min(generator_timer_pairs,key=lambda x: x[1])。done_time)

  生成器,min_timer=pair

  #等到计时器准备好

  time.sleep(max(0,min_timer.done_time - time.time()))

  del generator _ timer _ pairs[generator _ timer _ pairs . index(pair)]

  try: #再执行一步这个函数

  new_timer=next(生成器)

  generator _ timer _ pairs . append((generator,new_timer))

  函数完成时,StopIteration除外:#

  至此,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它将引发StopIteration,当我们不再有部分执行的函数(生成器)时,我们的函数就完成了。

  现在,我们将它包装在一个函数中,我们得到类似于asyncio.run Run与asyncio.gather的内容:

  def async _ run _ all(*生成器):

  generator_timer_pairs=[

  (生成器,异步定时器(0))

  对于发电机中发电机

  ]

  而发电机计时器对:

  pair=min(generator_timer_pairs,key=lambda x: x[1])。done_time)

  生成器,min_timer=pair

  time.sleep(max(0,min_timer.done_time - time.time()))

  del generator _ timer _ pairs[generator _ timer _ pairs . index(pair)]

  尝试:

  new_timer=next(生成器)

  generator _ timer _ pairs . append((generator,new_timer))

  除了StopIteration:

  及格

  异步运行全部(

  发送更新(10,1.0),

  send_updates(5,2.0),

  发送更新(4,3.0)

  )

  Async/await用于异步实现。asyncio的caveman版本的最后一步是支持Python 3.5中引入的async/await语法。Await的行为类似yield,只是它不直接返回提供的值,而是返回next((.).__await__())。异步函数返回“协程”,其行为类似于生成器,但需要使用。发送(无)而不是下一个()。(请注意,就像生成器在最初被调用时不会返回任何东西一样,异步函数在逐步执行之前也不会做任何事情,这就解释了我们前面提到的内容)。

  因此,鉴于这些信息,我们可以将我们的示例转换为async/await,只需做一些调整。以下是最终结果:

  类异步计时器:

  def __init__(self,duration: float):

  self.done_time=time.time()持续时间

  def __await__(self):

  屈服自我

  异步定义send_updates(count: int,interval_seconds: float):

  对于范围内的I(1,计数1):

  等待异步计时器(间隔_秒)

  print([{}]发送更新{}/{}。。格式(间隔_秒,I,计数))

  定义等待直到io就绪(ios):

  min_timer=min(ios,key=lambda x: x.done_time)

  time.sleep(max(0,min_timer.done_time - time.time()))

  返回ios.index(min_timer)

  def async _ run _ all(*协同例程):

  协程_io_pairs=[

  (协同程序,AsyncTimer(0))

  对于协程中的协程

  ]

  while协程io对:

  IOs=[cor的io,协程io对中的io]

  就绪索引=_等待_直到_ io _就绪(ios)

  协程,_=协程_io_pairs.pop(ready_index)

  尝试:

  new_io=coroutine.send(无)

  协程_io_pairs.append((协程,新_io))

  除了StopIteration:

  及格

  异步运行全部(

  发送更新(10,1.0),

  send_updates(5,2.0),

  发送更新(4,3.0)

  )我们有了它,并且使用async/await完成了我们的迷你异步示例。现在,你可能注意到了,我把timer重命名为IO,并把寻找最小计时器的逻辑提取到一个名为_wait_until_io_ready的名字中。这是这个例子和上一个话题:真实IO的有意联系。

  这里,我们已经使用async/await完成了我们的小型异步示例。现在,你可能已经注意到了,我把timer重命名为io,把用来寻找最小计时器的逻辑提取到一个名为_wait_until_io_ready的函数中。这是把这个例子和上一个话题:真实IO联系起来。

  Real IO(不仅仅是timer)所以,所有这些例子都很棒,但是它们和real asyncio有什么关系呢?我们要在真实IO上等待TCP套接字和文件读/写?好吧,美妙之处在于那个_wait_until_io_ready函数。为了让真正的IO正常工作,我们所要做的就是用类似于AsyncTimer的文件描述符创建一些新对象。然后,AsyncReadFile我们等待的对象集对应于一组文件描述符。最后,我们可以使用函数(syscall) select()来等待其中一个文件描述符准备就绪。因为TCP/UDP套接字是使用文件描述符实现的,所以这也包括网络请求。

  所以,所有这些例子都很好,但是它们与真正的异步IO有什么关系呢?我们要等待实际的IO,比如TCP套接字和文件读/写?嗯,它的优势在于_wait_until_io_ready函数。要使真正的IO工作,我们需要做的就是创建一些新的AsyncReadFile,类似于AsyncTimer,它包含一个文件描述符。然后,我们等待的一组AsyncReadFile对象对应一组文件描述符。最后,我们可以使用函数(syscall) select()来等待其中一个文件描述符准备就绪。因为TCP/UDP套接字是使用文件描述符实现的,所以这也包括网络请求。

  总结有了它,Python异步从头开始。虽然我们已经深入研究过了,但还是有很多细微之处没有涉及到。例如,要从另一个生成器函数调用一个类似生成器异步的函数,我们将使用yield from,并且我们可以通过将参数传递给。发送(.).关于异步IO的具体构造还有很多其他的话题,还有很多其他的微妙之处,比如异步生成器和任务取消,但是我们还是留给大家去详细研究吧。

  原创作品来自程,

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: