Locust,非常简单易用、分布式、python 开发的压力测试工具。有图形化界面,支持将压测数据导出.

官网:https://locust.io/

文档:https://docs.locust.io/

GitHub:https://github.com/locustio/locust

安装:

pip3 install locust locust -V

测试步骤:

[1] - 编写Python用户脚本

[2] - 使用locust命令执行性能测试

[3] - (可选)通过Web界面监测结果

测试步骤示例

1. 编写压测脚本

test.py

from locust import HttpUser, task class HelloWorldUser(HttpUser): @task def hello_world(self): self.client.get("/hello") #先访问 /hello self.client.get("/world") #再访问 /word

from locust import HttpUser, TaskSet, task # 定义用户行为 class UserBehavior(TaskSet): @task def baidu_index(self): self.client.get("/") class WebsiteUser(HttpUser): task = [UserBehavior] # 指向一个定义的用户行为类 min_wait = 3000 # 执行事务之间用户等待时间的下界(单位:毫秒) max_wait = 6000 # 执行事务之间用户等待时间的上界(单位:毫秒)

2. 启动压测

locust -f test.py --host=https://www.baidu.com

3. 压测监控

http://localhost:8089

Number of users to simulate 模拟用户数

Hatch rate (users spawned/second) 每秒钟增加用户数

点击 "Start swarming" 进入压测页面

压测界面右上角有:被压测的地址、当前状态、RPS、失败率、开始或重启按钮

性能测试参数

  • Type 请求的类型,例如GET/POST
  • Name 请求的路径
  • Requests 当前请求的数量
  • Fails 当前请求失败的数量
  • Median 中间值,单位毫秒,请求响应时间的中间值
  • Average 平均值,单位毫秒,请求的平均响应时间
  • Min 请求的最小服务器响应时间,单位毫秒
  • Max 请求的最大服务器响应时间,单位毫秒
  • Average size 单个请求的大小,单位字节
  • Current RPS 代表吞吐量(Requests Per Second的缩写),指的是某个并发用户数下单位时间内处理的请求数。等效于QPS,其实可以看作同一个统计方式,只是叫法不同而已。

自定义压测代码

qps_test.py

#!/usr/bin/python3 #!--*-- coding: utf-8 --*-- import time import threading import requests NQ = 1 COUNT = 100 api = "http://test.com/?picurl=" def qps_tc(): time_start = time.time() req = requests.get(api) assert req.status_code == 200 time_cost = time.time() - time_start # print('time cost', time_cost, 'sec', req.status_code) if __name__ == "__main__": #single thread print('Single thread perform search action one by one', COUNT, 'times', 'nq=', NQ) time_start = time.time() for k in range(COUNT): qps_tc() time_cost = time.time() - time_start print('Totally time cost', time_cost, 'sec') print('QPS', COUNT/time_cost) print('\n=================================================\n') #multi thread print('Multi-thread perform search action parallelly', COUNT, 'times', 'nq=', NQ) threads = [] time_start = time.time() for k in range(COUNT): x = threading.Thread(target=qps_tc, args=()) threads.append(x) x.start() for th in threads: th.join() time_cost = time.time() - time_start print('Totally time cost', time_cost, 'sec') print('QPS', COUNT / time_cost)

go 压测工具

Last modification:December 9th, 2021 at 10:32 am