原文:大型fastapi项目实战 高并发请求神器之aiohttp(上) - 2021.02.02

原文:大型fastapi项目实战 高并发请求神器之aiohttp(下) - 2021.03.03

在 Python 众多的 HTTP 客户端中,最有名的莫过于 requests、aiohttp 和 httpx。在不借助其他第三方库的情况下,requests 只能发送同步请求;aiohttp 只能发送异步请求;httpx 既能发送同步请求,又能发送异步请求。在并发量大的情况下,如何高效的处理数据,异步是优选,在生产环境广泛使用 aiohttp。

aiohttp 是一个为 Python 提供异步HTTP 客户端/服务端编程,基于 asyncio(Python用于支持异步编程的标准库)的异步库。

aiohttp 核心功能:

  • 同时支持客户端使用和服务端使用
  • 同时支持服务端 WebSockets 组件和客户端 WebSockets 组件,开箱即用
  • web 服务器具有中间件,信号组件和可插拔路由的功能。

以下的案例都是基于客户端展开,在生产中主要是用 aiohttp 来做客户端用。

aiohttp 安装:

pip install aiohttp
pip install aiodns #更快的客户端API DNS 解析方案,推荐

1. aiohttp 业务核心功能

1.1. 发起 get 请求

# -*- encoding: utf-8 -*-
import asyncio
import aiohttp


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://www.baidu.com') as resp:
            print(resp.status)
            res = await resp.text()
            print(res[:100])


if __name__ == '__main__':
    # 注意:
    # python3.7+ 支持写法
    # asyncio.run(main())
    
    # python3.6及以下版本写法
    event_loop = asyncio.get_event_loop()
    result = event_loop.run_until_complete(asyncio.gather(main()))
    event_loop.close()

[1]. 先通过 event_loop = asyncio.get_event_loop() 创建了一个事件循环

[2]. 通过 asyncio.gather 接受多个 future 或 coro 组成的列表任务

[3]. 通过 event_loop.run_until_complete(task) 开启事件循环 直到这个任务执行结束

[4]. async with aiohttp.ClientSession() as session: 是创建了一个异步的网络请求的上线文管理句柄

[5]. async with session.get('http://www.baidu.com') as resp: 异步请求数据

[6]. res = await resp.text() 异步的接收数据

两个关键词

[1]. async 如果一个函数被这个async 关键词修饰 那这个函数就是一个 future object

[2]. await 协程对象执行到这个关键词定义之处就会做挂起操作,原理是与yield /yield from 类似的。

1.2. 发起 post 请求

# -*- encoding: utf-8 -*-
import asyncio
import aiohttp


async def post_v1():
    data = b'\x00Binary-data\x00'  # 未经编码的数据通过bytes数据上传
    data = 'text'  # 传递文本数据
    data = {'key': 'value'}  # 传递form表单
    async with aiohttp.ClientSession() as sess:
        async with sess.post('http://httpbin.org/post', data=data) as resp:
            print(resp.status)


# 复杂的 post 请求
async def post_v2():
    payload = {'key': 'value'}  # 传递 pyload
    async with aiohttp.ClientSession() as sess:
        async with sess.post('http://httpbin.org/post', json=payload) as resp:
            print(resp.status)


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    result = event_loop.run_until_complete(asyncio.gather(main()))
    event_loop.close()

1.3. 向 url 中传递参数

有些场景是需要拼接请求url 在这个时候可以使用本 case 来做处理

# -*- encoding: utf-8 -*-
import asyncio
import aiohttp


async def main():
    """ 以下三种方式均可以 """
    params = {'key1': 'value1', 'key2': 'value2'}
    params = [('key', 'value1'), ('key', 'value2')]
    params = 'key=value+1'
    async with aiohttp.ClientSession() as sess:
        async with sess.get('http://httpbin.org/get', params=params) as resp:
            print(resp.status)


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    result = event_loop.run_until_complete(asyncio.gather(main()))
    event_loop.close()

1.4. 向目标服务器上传文件

有时候,确实是有向服务器传文件的需求,eg.上传回执单;上传图片...... 100张 10000张的量级的时候通常会想用多线程去处理,但量再大,再使用 多线程+requests 的方式就会发现有大量的报错,若有类似的使用场景,可以用以下 case 处理

import aiohttp


async def main():
    """ 传递文件 """
    files = {'file': open('report.xls', 'rb')}
    async with aiohttp.ClientSession() as sess:
        async with sess.post('http://httpbin.org/post', data=files) as resp:
            print(resp.status)
            print(await resp.text())


async def main2():
    """ 实例化 FormData 可以指定 filename 和 content_type """
    data = aiohttp.FormData()
    data.add_field('file',
                   open('report.xls', 'rb'),
                   filename='report.xls',
                   content_type='application/vnd.ms-excel')
    async with aiohttp.ClientSession() as sess:
        async with sess.post('http://httpbin.org/post', data=data) as resp:
            print(resp.status)
            print(await resp.text())


async def main3():
    """ 流式上传文件 """
    async with aiohttp.ClientSession() as sess:
        with open('report.xls', 'rb') as f:
            async with sess.post('http://httpbin.org/post', data=f) as resp:
                print(resp.status)
                print(await resp.text())


async def main4():
    """因为 content属性是 StreamReader(提供异步迭代器协议),
    所以可以将 get 和 post 请求链接在一起。python3.6+能使用"""
    async with aiohttp.ClientSession() as sess:
        async with sess.get('http://python.org') as resp:
            async with sess.post('http://httpbin.org/post', data=resp.content) as r:
                print(r.status)
                print(await r.text())

1.5. 设置请求超时

有时候,向服务器发送请求,若没有设置超时时间,此请求就会一直阻塞直到系统报错,这对于系统是无法容忍的,所以发请求的时候千万要记得加上超时时间。

import aiohttp

timeout = aiohttp.ClientTimeout(total=60)


async def main():
    async with aiohttp.ClientSession(timeout=timeout) as sess:
        async with sess.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text())

2. aiohttp 连接池

2.1.使用连接器

想要调整请求的传输层,可以为ClientSession及其同类组件传递自定义的连接器。例如:

conn = aiohttp.TCPConnector()
session = aiohttp.ClientSession(connector=conn)

注:不要给多个会话对象使用同一个连接器,某一会话对象拥有其所有权。

2.2. 限制连接池的容量

限制同一时间打开的连接数可以传递limit参数:

#将总数限制在30,默认情况下是100.
conn = aiohttp.TCPConnector(limit=30)

#如果不想有限制,传递0即可:
conn = aiohttp.TCPConnector(limit=0)

3. aiohttp 性能测试

使用 aiohttp、requests 作为客户端 模拟多任务请求 做一下两者的性能测试。 请求地址为: url = "http://www.baidu.com" 分别模拟请求: 1.50次调用 2.300次调用.

# -*- encoding: utf-8 -*-
# requests 方式
import random
import time
import datetime
import requests


def request_task():
    res = requests.get(url="http://www.baidu.com",verify=False)
    print(res.status_code)


def request_main():
    start = time.time()
    for _ in range(300):
        request_task()
    end = time.time()
    print("发送300次,耗时: %s" % (end - start))  # 发送300次,耗时: 7.497658014297485


if __name__ == "__main__":
    request_main()
# -*- encoding: utf-8 -*-
# aiohttp 方式
import aiohttp
import time
import asyncio


async def aoi_main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://www.baidu.com') as resp:
            print(resp.status)


start = time.time()
scrape_index_tasks = [asyncio.ensure_future(aoi_main()) for _ in range(300)]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*scrape_index_tasks)
loop.run_until_complete(tasks)
end = time.time()
print("发送300次,耗时: %s" % (end - start))  # 发送300次,耗时: 2.5207901000976562

通过简单的测试我们可以得出一些结论:

  1. 并不是说使用异步请求就比同步请求性能高
  2. 在并发任务少的情况下建议使用同步的方式做请求,反之在并发任务量大的情况下建议使用异步的方式做请求
Last modification:September 4th, 2021 at 04:11 pm