并发和并行一直是容易混淆的概念。

并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行.

采用 asyncio 实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作. 创建多个协程的列表,然后将这些协程注册到时间循环中.

1. asyncio实现并发

import asyncio
import time

now  = lambda:time.time()

async def do_some_work(x):
    print('Waiting:',x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

#创建多个协程对象
coroutine1=do_some_work(1)
coroutine2=do_some_work(2)
coroutine3=do_some_work(4)

#创建任务列表
tasks=[
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

#将任务列表注册到事件循环中
loop=asyncio.get_event_loop()

#asyncio.wait(tasks) 注册任务
loop.run_until_complete(asyncio.wait(tasks))

#或
#asyncio.gather(*tasks) 注册任务
#loop.run_until_complete(asyncio.gather(*tasks))


#获取返回的结果
for task in tasks:
    print('Task ret:',task.result())

print('TIME:',now()-start)

输出结果如:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.002362251281738

运行结果显示,串行的话应该为1+2+4=7秒;并发每个任务里面执行do_some_work,第一次耗时是1秒,遇到耗时执行其他协程,第二次耗时是2秒,遇到耗时执行其他协程,以此类推. 遇到耗时进行阻塞进行其他协程,最长的耗时为4秒,三个任务4秒多执行完成.

2. uvloop替代asyncio

Python标准库中提供了asyncio模块,用于支持基于协程的异步编程.

uvloop 是 asyncio 中的事件循环的替代方案,替换后可以使得asyncio性能提高.

事实上,uvloop要比nodejs、gevent等其他python异步框架至少要快2倍,性能可以比肩Go语言.

uvloop 安装:

pip3 install uvloop

在项目中使用uvloop替换asyncio的事件循环非常简单,只需:

import asyncio
#import uvloop
import time

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

now  = lambda:time.time()

async def do_some_work(x):
    print('Waiting:',x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

#创建多个协程对象
coroutine1=do_some_work(1)
coroutine2=do_some_work(2)
coroutine3=do_some_work(4)

#创建任务列表
tasks=[
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

#将任务列表注册到事件循环中
#loop=asyncio.get_event_loop()
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

#W1:
results = await asyncio.gather(*tasks)
#获取返回的结果
for result in results:
    print('Task ret:', result)

#W2
#results, pendings = await asyncio.wait(tasks)
#获取返回的结果
#for result in results:
#    print('Task ret:', result.result())
    
loop.close()
print('TIME:', now()-start)

输出如:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.0035035610198975

知名的 ASGI uvicorn 内部就是使用的uvloop的事件循环.

Last modification:June 16th, 2022 at 03:17 pm