写在前面:
花了一周的时间,对协程做了一个简单的梳理,特别是异步编程asyncio库的使用,做了详细的说明。本文主要包括的知识点有:yield生成器的复习并实现协程的功能、greenlet库实现协程、gevent库实现协程、asyncio异步协程的介绍、异步协程的创建与运行、任务的创建与运行、并发运行gather/wait/as_complete/wait_for等方法的实现、异步协程的嵌套、await关键字的理解等等,这些都是基础。由于篇幅比较长,打算分为两篇,第二篇在介绍一下asyncio的其他用法。
协程
,又称为微线程,它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带CPU的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。
通俗的理解:
在一个线程中的某个函数中,我们可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,
注意不是通过调用函数的方式做到的
,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。
协程与线程的差异:
在实现多任务时, 线程切换__从系统层面__远不止保存和恢复CPU上下文这么简单。操作系统为了程序运行的高效性,
每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作
,所以线程的切换非常耗性能。但是__协程的切换只是单纯地操作CPU的上下文__,所以一秒钟切换个上百万次系统都抗的住。
在python中,yield(生成器)可以很容易的实现上述的功能,从一个函数切换到另外一个函数。
import time
def task_1():
while True:
print("--This is task 1!--before")
yield
print("--This is task 1!--after")
time.sleep(0.5)
def task_2():
while True:
print("--This is task 2!--before")
yield
print("--This is task 2!--after")
time.sleep(0.5)
if __name__ == "__main__":
t1 = task_1()
t2 = task_2()
while True:
next(t1)
print("\nThe main thread!\n")
next(t2)
运行结果如下:
生成器的回顾,yield方法的执行流程
def generate():
i = 0
while i < 5:
print("我在这。。")
xx = yield i
print(xx)
i += 1
g = generate()
g.send(None)
g.send("lalala")
我在这。。
lalala
我在这。。
通过生成器实现生产者-消费者模型,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
def consumer():
print('--4、开始执行生成器代码--')
response = None
while True:
print('--5、yield,中断,保存上下文--')
n = yield response
print('--8、获取上下文,继续往下执行--')
if not n:
return
print("[Consumer]: consuming {} ..".format(n))
response = "OK"
def produce(c):
print("--3、启动生成器,开始执行生成器consumer--")
c.send(None)
print("--6、继续往下执行--")
n = 0
while n < 5:
n += 1
print("[Producer]: producing {} ..".format(n))
print("--7、第{}次唤醒生成器,从yield位置继续往下执行!--".format(n+1))
r = c.send(n)
print("--9、从第8步往下--")
print("[Producer]: consumer return {} ..".format(r))
c.close()
if __name__ == "__main__":
c = consumer()
produce(c)
--3、启动生成器,开始执行生成器consumer--
--4、开始执行生成器代码--
--5、yield,中断,保存上下文--
--6、继续往下执行--
[Producer]: producing 1 ..
--7、第2次唤醒生成器,从yield位置继续往下执行!--
--8、获取上下文,继续往下执行--
[Consumer]: consuming 1 ..
--5、yield,中断,保存上下文--
--9、从第8步往下--
[Producer]: consumer return OK ..
[Producer]: producing 2 ..
--7、第3次唤醒生成器,从yield位置继续往下执行!--
--8、获取上下文,继续往下执行--
[Consumer]: consuming 2 ..
--5、yield,中断,保存上下文--
--9、从第8步往下--
[Producer]: consumer return OK ..
[Producer]: producing 3 ..
--7、第4次唤醒生成器,从yield位置继续往下执行!--
--8、获取上下文,继续往下执行--
[Consumer]: consuming 3 ..
--5、yield,中断,保存上下文--
--9、从第8步往下--
[Producer]: consumer return OK ..
[Producer]: producing 4 ..
--7、第5次唤醒生成器,从yield位置继续往下执行!--
--8、获取上下文,继续往下执行--
[Consumer]: consuming 4 ..
--5、yield,中断,保存上下文--
--9、从第8步往下--
[Producer]: consumer return OK ..
[Producer]: producing 5 ..
--7、第6次唤醒生成器,从yield位置继续往下执行!--
--8、获取上下文,继续往下执行--
[Consumer]: consuming 5 ..
--5、yield,中断,保存上下文--
--9、从第8步往下--
[Producer]: consumer return OK ..
使用greenlet实现协程
from greenlet import greenlet
import time
def task_1():
while True:
print("--This is task 1!--")
g2.switch()
time.sleep(0.5)
def task_2():
while True:
print("--This is task 2!--")
g1.switch()
time.sleep(0.5)
if __name__ == "__main__":
g1 = greenlet(task_1)
g2 = greenlet(task_2)
g1.switch()
--This is task 1!--
--This is task 2!--
--This is task 1!--
--This is task 2!--
--This is task 1!--
--This is task 2!--
--This is task 1!--
--This is task 2!--
greenlet已经实现了协程,但是这个需要人工切换,很麻烦。python中还有一个比greenlet更强大的并且能够自动切换任务的模块gevent,其原理是当一个greenlet遇到IO(比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程 ,就保证总有greenlet在运行,而不是等待IO。
import gevent
def task_1(num):
for i in range(num):
print(gevent.getcurrent(), i)
gevent.sleep(1)
if __name__ == "__main__":
g1 = gevent.spawn(task_1, 5)
g2 = gevent.spawn(task_1, 5)
g3 = gevent.spawn(task_1, 5)
g1.join()
g2.join()
g3.join()
上述结果,在不添加gevent.sleep(1)时,是3个greenlet依次运行,而不是交替运行的。在添加gevent.sleep(1)后,程序运行到这后,交出控制权,执行下一个协程,等待这个耗时操作完成后再重新回到上一个协程,运行结果时交替运行。
monkey补丁 不必强制使用gevent里面的sleep、sorcket等等了
from gevent import monkey
import gevent
import random
import time
def task_1(name):
for i in range(5):
print(name, i)
time.sleep(1)
def task_2(name):
for i in range(3):
print(name, i)
time.sleep(1)
if __name__ == "__main__":
monkey.patch_all()
gevent.joinall([
gevent.spawn(task_1, "task_1"),
gevent.spawn(task_2, "task_2")
print("the main thread!")
task_1 0
task_2 0
task_1 1
task_2 1
task_1 2
task_2 2
task_1 3
task_1 4
the main thread!
python中使用协程最常用的库就是asyncio,首先先介绍几个概念:
- 1、event_loop 事件循环:相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件时,就会调用对应的处理方法。
- 2、coroutine 协程:协程对象,只一个使用async关键字定义的函数,他的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。
- 3、task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程的进一步封装,其中包含任务的各种状态。
- 4、future:代表将来执行或没有执行的任务结果。它与task没有本质的区别。
- 5、async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
1、定义一个协程 通过async定义一个协程,协程是一个对象,不能直接运行,需要把协程加入到事件循环(loop)中,由loop在适当的时候调用协程。asyncio.get_event_loop()方法可以创建一个事件循环,然后由run_until_complete(协程对象)将协程注册到事件循环中,并启动事件循环。
run_until_complete根据传递的参数的不同,返回的结果也有所不同
- 1、run_until_complete()传递的是一个协程对象或task对象,则返回他们finished的返回结果(前提是他们得有return的结果,否则返回None)
- 2、run_until_complete(asyncio.wait(多个协程对象或任务)),函数会返回一个元组包括(done, pending),通过访问done里的task对象,获取返回值
- 3、run_until_complete(asyncio.gather(多个协程对象或任务)),函数会返回一个列表,列表里面包括各个任务的返回结果,按顺序排列
python 3.7 以前的版本调用异步函数的步骤:
- 1、调用asyncio.get_event_loop()函数获取事件循环loop对象
- 2、通过不同的策略调用loop.run_forever()方法或者loop.run_until_complete()方法执行异步函数
python3.7 以后的版本使用asyncio.run即可。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(coroutine_1)
print(result)
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
import asyncio
async def main():
print("hello")
await asyncio.sleep(1)
print("world")
asyncio.run(main())
hello
world
2、创建一个task 协程对象不能直接运行,在注册到事件循环的时候,其实是run_until_complete方法将协程包装成一个task对象,所谓的task对象就是Future类的子类,它保存了协程运行后的状态,用于未来获取协程的结果。
创建task后,task在加入事件循环之前是pending状态,因为下例中没有耗时操作,task很快会完成,后面打印finished状态。
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
coroutine_1 = work(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine_1)
print(task)
loop.run_until_complete(task)
print(task)
<Task pending coro=<work() running at <ipython-input-9-bebcb42450f1>:3>>
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
<Task finished coro=<work() done, defined at <ipython-input-9-bebcb42450f1>:3> result=None>
isinstance(task, asyncio.Future)
补:isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()。
isinstance() 与 type() 区别:
- type() 不会认为子类是一种父类类型,不考虑继承关系。
- isinstance() 会认为子类是一种父类类型,考虑继承关系。
class A():
class B(A):
b = B()
isinstance(b, A)
3、绑定回调 在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过这个对象可以获取协程的返回值,如果回调函数需要多个参数,可以通过偏函数导入。
从下例可以看出,coroutine执行结束时候会调用回调函数,并通过future获取协程返回(return)的结果。我们创建的task和回调里面的future对象,实际上是同一个对象。
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back(future):
print("Callback: {}".format(future.result()))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
loop.run_until_complete(task)
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
Callback: Work 1 is finished
'Work 1 is finished'
当回调函数需要传递多个参数时,可以使用functools里的partial方法(偏函数导入这些参数)
functools.partial(func, * args, * * keywords),函数装饰器,返回一个新的partial对象。调用partial对象和调用被修饰的函数func相同,只不过调用partial对象时传入的参数个数通常要少于调用func时传入的参数个数。当一个函数func可以接收很多参数,而某一次使用只需要更改其中的一部分参数,其他的参数都保持不变时,partial对象就可以将这些不变的对象冻结起来,这样调用partial对象时传入未冻结的参数,partial对象调用func时连同已经被冻结的参数一同传给func函数,从而可以简化调用过程。
如果调用partial对象时提供了更多的参数,那么他们会被添加到args的后面,如果提供了更多的关键字参数,那么它们将扩展或者覆盖已经冻结的关键字参数。
具体的偏函数使用方法见下例:
from functools import partial
def func(a, b):
return a + b
result = func(1, 2)
new_func = partial(func, 1)
result2 = new_func(2)
print(result, result2)
import asyncio
import functools
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
return "Work {} is finished".format(x)
def call_back_2(num, future):
print("Callback_2: {}, the num is {}".format(future.result(), num))
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(functools.partial(call_back_2, 100))
loop.run_until_complete(task)
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
Callback_2: Work 1 is finished, the num is 100
'Work 1 is finished'
在不绑定回调函数的时候,当task处于finished的状态时,可以直接读取task的result的值
import asyncore
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
return "Work {} is finished".format(x)
coroutine = work(1)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
print("The task's result is '{}'".format(task.result()))
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
The task's result is 'Work 1 is finished'
4、阻塞和await 使用async可以定义协程对象,使用await可以正对耗时操作进行挂起,就像生成器里的yield一样,函数让出控制权。 协程遇到await,事件循环就会挂起这个协程,执行别协程,直到其他协程也挂起或执行完毕,在进行下一个协程的执行。
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。
耗时操作一般指IO操作: 网络请求,文件读取等,使用asyncio.sleep模拟耗时操作。协程的目的也是让这些IO操作异步化。
4-1、并发运行任务:
asyncio.gather(* aws,loop = None,return_exceptions = False ) 同时在aws 序列中运行等待对象。
- 如果在aws中等待的是协程,它将自动调度为任务。
- 如果所有等待都成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序。
- 如果return_exceptions是False(默认),则第一个引发的异常会立即传播到等待的任务gather()。aws序列 中的其他等待项将不会被取消并继续运行。
- 如果return_exceptions是True,异常的处理方式一样成功的结果,并在结果列表汇总。
- 如果gather()被取消,所有提交的awaitables(尚未完成)也被取消。
- 如果aws序列中的任何任务或未来被取消,则将其视为已引发CancelledError- 在这种情况下不会取消gather() 呼叫。这是为了防止取消一个提交的任务/未来以导致其他任务/期货被取消。
4-2、屏蔽取消操作:
asyncio.shield(aw, * , loop=None) 保护一个 可等待对象 防止其被 取消。如果 aw 是一个协程,它将自动作为任务加入日程。
- res = await shield(something()) 相当于: res = await something()
不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 “await” 表达式仍然会引发 CancelledError。
try:
res = await shield(something())
except CancelledError:
res = None
4-3、超时:
asyncio.wait_for(aw, timeout, * , loop=None) 等待 aw 可等待对象 完成,指定 timeout 秒数后超时。
- 如果 aw 是一个协程,它将自动作为任务加入日程。
- timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
- 如果发生超时,任务将取消并引发 asyncio.TimeoutError.
- 要避免任务 取消,可以加上 shield()。函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。如果等待被取消,则 aw 指定的对象也会被取消。
- loop 参数已弃用,计划在 Python 3.10 中移除。
4-4、简单等待:
asyncio.wait(aws,* , loop = None,timeout = None,return_when = ALL_COMPLETED ) 同时运行aws中的等待对象 并阻塞 ,直到return_when指定的条件。
- 返回两组tasks/futures:(done,pending)
- 用法:done, pending = await asyncio.wait(aws)
- return_when 指定此函数应在何时返回。它必须为以下常数之一:
- FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
- FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
- ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。
与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。
asyncio.as_completed(aws, * , loop=None, timeout=None) 并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。
- 如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError。
- 示例:
for f in as_completed(aws):
earliest_result = await f
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f"Task {name}: Finished!"
async def main():
results = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
print(results)
asyncio.run(main())
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
['Task A: Finished!', 'Task B: Finished!', 'Task C: Finished!']
import asyncio
async def eternity():
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
timeout!
import asyncio
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait((task, ))
if task in done:
print(f"The task's result is {task.result()}")
The task's result is 42
import asyncio, time
async def work_1(x):
print(f"Starting {x}")
time.sleep(1)
print(f"Starting {x}")
for _ in range(3):
print(f"Work {x} is running ..")
await asyncio.sleep(2)
return f"Work {x} is finished"
async def work_2(x):
print(f"Starting {x}")
for _ in range(3):
await asyncio.sleep(1)
print(f"Work {x} is running ..")
return f"Work {x} is finished"
coroutine_1 = work_1(1)
coroutine_2 = work_2(2)
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_2),
dones, pendings = loop.run_until_complete(asyncio.wait(tasks))
for task in dones:
print(task.result())
Starting 1
Starting 1
Work 1 is running ..
Starting 2
Work 2 is running ..
Work 1 is running ..
Work 2 is running ..
Work 2 is running ..
Work 1 is running ..
Work 1 is finished
Work 2 is finished
上面的执行结果是: 先打印Starting 1,然后等待1秒再次打印Starting 1,Work 1 is running …,Starting 2(这三个是一起出现的,应该是执行太快的原因),由于work2的耗时操作比较短,等待完成后打印Work 2 is running …,接着for循环,再来一轮,work2中再次碰到await,挂起任务,但是work1中的耗时操作还没结束,大家都在等待耗时操作结束,work2正好是2次,2秒,与work1耗时操作同时完成,所以打印Work 1 is running …Work 2 is running …同时出现,最后,第三轮循环,work2等待1秒后打印Work 2 is running …,等待一秒后,work1完成耗时操作,打印Work 1 is running …,异步任务完成。
5、协程嵌套 使用async可以定义协程,协程用于耗时的IO操作。我们也可以封装更多的IO操作过程,在一个协程中await另外一个协程,实现协程的嵌套。
import asyncio, time
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
await asyncio.sleep(1)
return "Work {} is finished!".format(x)
async def main_work():
coroutine_1 = work(1)
coroutine_2 = work(2)
coroutine_3 = work(3)
tasks = [
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_2),
asyncio.ensure_future(coroutine_3),
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("The task's result is : {}".format(task.result()))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main_work())
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
Work 1 is running ..
Work 2 is running ..
Work 3 is running ..
The task's result is : Work 2 is finished!
The task's result is : Work 3 is finished!
The task's result is : Work 1 is finished!
import asyncio
async def work(x):
for _ in range(3):
print("Work {} is running ..".format(x))
await asyncio.sleep(1)
return "Work {} is finished!".format(x)
async def main_work():
coroutine_1 = work(1)
coroutine_2 = work(2)
coroutine_3 = work(3)
tasks = [
asyncio.ensure_future(coroutine_2),
asyncio.ensure_future(coroutine_1),
asyncio.ensure_future(coroutine_3),
for task in asyncio.as_completed(tasks):
result = await task
print(f"The task's result is : {result}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main_work())
Work 2 is running ..
Work 1 is running ..
Work 3 is running ..
Work 2 is running ..
Work 1 is running ..
Work 3 is running ..
Work 2 is running ..
Work 1 is running ..
Work 3 is running ..
The task's result is : Work 2 is finished!
The task's result is : Work 1 is finished!
The task's result is : Work 3 is finished!
以上事例,对asyncio的异步协程有了基本的了解,这里,结合python3.7的说明文档,对部分知识再做说明。
运行协程,asyncio提供了三种主要的机制:
import asyncio
import time
async def work(delay, msg):
print(f"Task receives the message :'{msg}' ")
await asyncio.sleep(delay)
print(msg)
async def main():
print(f"Started at {time.strftime('%X')}")
await work(1, "hello")
await work(2, "world")
print(f"Finished at time {time.strftime('%X')}")
asyncio.run(main())
Started at 20:03:44
Task receives the message :'hello'
hello
Task receives the message :'world'
world
Finished at time 20:03:47
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
await asyncio.sleep(1)
coroutine_1 = work(1)
coroutine_2 = work(2)
await coroutine_1
await coroutine_2
print("The main thread")
Work 1 is running ..
Work 1 is running ..
Work 1 is running ..
Work 2 is running ..
Work 2 is running ..
Work 2 is running ..
The main thread
- 3、asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。下例并发运行两个work协程
import asyncio
import time
async def work(delay, msg):
print(f"Task receives the message :'{msg}' ")
print("----1----")
await asyncio.sleep(delay)
print("----2----")
print(msg)
async def main():
task1 = asyncio.create_task(work(1, "hello"))
task2 = asyncio.create_task(work(3, "world"))
print(f"Started at {time.strftime('%X')}")
await task1
print("----3----")
await task2
print("----4----")
print(f"Finished at time {time.strftime('%X')}")
asyncio.run(main())
Started at 20:42:50
Task receives the message :'hello'
----1----
Task receives the message :'world'
----1----
----2----
hello
----3----
----2----
world
----4----
Finished at time 20:42:53
import asyncio
async def work(x):
for _ in range(3):
print('Work {} is running ..'.format(x))
await asyncio.sleep(x)
coroutine_1 = work(1)
coroutine_2 = work(2)
task1 = asyncio.create_task(coroutine_1)
task2 = asyncio.create_task(coroutine_2)
await task1
await task2
print("The main thread")
Work 1 is running ..
Work 2 is running ..
Work 1 is running ..
Work 2 is running ..
Work 1 is running ..
Work 2 is running ..
The main thread
可等待对象: 如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待 对象有三种主要类型: 协程, 任务 和 Future .
协程:python中的协程属于 可等待 对象,所以可以在其他协程中被等待
import asyncio
async def nested():
return 42
async def main():
print(await nested())
asyncio.run(main())
任务: 是用来设置日程以便 并发 执行协程
当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行:
import asyncio
async def nested1():
print("nested1")
await asyncio.sleep(0.5)
print("nested1 is finished!")
return 1
async def nested2():
print("nested2")
await asyncio.sleep(0.5)
print("nested2 is finished!")
return 2
async def nested3():
print("nested3")
await asyncio.sleep(0.5)
print("nested3 is finished!")
return 3
async def nested4():
print("nested4")
await asyncio.sleep(0.5)
print("nested4 is finished!")
return 4
async def main():
print("main")
task1 = asyncio.create_task(nested1())
task2 = asyncio.create_task(nested2())
task3 = asyncio.create_task(nested3())
task4 = asyncio.create_task(nested4())
await asyncio.sleep(1)
print(await task1)
print(await task2)
print(await task3)
print(await task4)
asyncio.run(main())
nested1
nested2
nested3
nested4
nested1 is finished!
nested2 is finished!
nested4 is finished!
nested3 is finished!
Future对象 Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
async def main():
await function_that_returns_a_future_object()
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
写在前面: 花了一周的时间,对协程做了一个简单的梳理,特别是异步编程asyncio库的使用,做了详细的说明。本文主要包括的知识点有:yield生成器的复习并实现协程的功能、greenlet库实现协程、gevent库实现协程、asyncio异步协程的介绍、异步协程的创建与运行、任务的创建与运行、并发运行gather/wait/as_complete/wait_for等方法的实现、异步协程的嵌套、aw...
async def job(t): # async 形式的功能
print('Start job ', t)
await asyncio.sleep(t) # 等待 "t" 秒, 期间切换其他任务
print('Job ', t...
上篇文章我们说过由于GIL锁的限制,导致Python不能充分利用多线程来实现高并发,在某些情况下使用多线程可能比单线程效率更低,所以Python中出现了协程。协程(coroutine)又称微线程,是一中轻量级的线程,它可以在函数的特定位置暂停或恢复,同时调用者可以从协程中获取状态或将状态传递给协程。进程和线程都是通过CPU的调度实现不同任务的有序执行,而协程是由用户程序自己控制调度的,也没有线程切换的开销,所以执行效率极高。
生成器方式实现
早先的协程是使用生成器关键字yield来实现的,和生成器很相.
tasks.py是一个简单快速的任务队列,用于并行执行多个任务。 您需要做的就是将任务指定为一个简单的函数,该函数接受一个参数,您将获得即时并行性。
基于eventlet、多处理和redis。
它非常适合从单个节点并行执行多个网络绑定任务,而无需经历设置 map reduce 集群的痛苦。 它同时使用进程和绿色线程来从单个节点设置中提取最大值。
安装 redis 并启动服务器, tasks使用 redis 来排队作业。 如果您已经设置了 redis 服务器,请调用tasks.set_redis并传递一个 redis 连接对象,该对象的数据库/命名空间与您通常在应用程序中使用的数据库/命名空间不同。
安装 redis-py 和 eventlet 库。
pip install redis eventlet
安装任务或将此包复制到您的源代码中。
pip install t
在Python中可以使用协程实现多任务,协程是一种轻量级的线程,可以在一个线程中实现多个任务的并发执行。
下面是一个简单的示例,其中使用asyncio模块创建了两个协程函数,然后使用asyncio.gather()方法并发执行这两个协程函数:
```python
import asyncio
async def task1():
for i in range(5):
print("Task 1 executed")
await asyncio.sleep(1)
async def task2():
for i in range(5):
print("Task 2 executed")
await asyncio.sleep(1)
async def main():
await asyncio.gather(task1(), task2())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在这个示例中,task1和task2都是协程函数,使用async关键字定义。在这两个协程函数中,我们使用了await语句来暂停协程的执行,并在一秒钟后再次恢复协程的执行。
在main()函数中,我们使用asyncio.gather()方法并发执行task1和task2协程函数。asyncio.gather()方法返回一个协程,我们需要使用await关键字来等待这个协程的执行完成。
最后,我们使用asyncio.get_event_loop()方法获取一个事件循环对象,然后使用loop.run_until_complete()方法来运行main()协程函数。
这样,就可以通过协程实现循环执行多个任务的效果。