aPython 驱动MongoDB 两种推荐方式 - MongoDB Python Drivers

  • PyMongo 是从Python使用MongoDB的推荐方法
  • Motor 是推荐的MongoDB Python异步驱动程序

这里简单汇总下 Motor 异步驱动 MongoDB. 文档可参考 Motor(Async Driver).

https://github.com/mongodb/motor/

Motor是一个异步 mongodb driver,支持异步读写mongodb. 通常用在基于Tornado的异步web服务器中;同时支持使用asyncio(Python3.4以上标准库)作为异步模型,使用起来十分方便.

1. motor 安装

pip install motor

1.1. 连接 MongoDB Atlas

连接 MongoDB Atlas 集群,如,

import motor client = motor.motor_tornado.MotorClient( "mongodb+srv://<username>:<password>@<cluster-url>/test?retryWrites=true&w=majority") db = client.test

2. Motor With Tornado

https://motor.readthedocs.io/en/stable/tutorial-tornado.html

pip install tornado motor

类似于 Pymongo,Motor 将数据表示为 4 层对象层:

[1] - MotorClient

[2] - MotorDatabase

[3] - MotorCollection

[4] - MotorCursor

2.1. Client 客户端

import motor.motor_tornado client = motor.motor_tornado.MotorClient() client = motor.motor_tornado.MotorClient('localhost', 27017) client = motor.motor_tornado.MotorClient('mongodb://localhost:27017') #副本集(replica set) client = motor.motor_tornado.MotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

2.2. Database 数据库

db = client.test_database db = client['test_database']

2.3. Tornado 应用程序启动

#1.MotoClient 并未真正连接到 server,初始化连接 db = motor.motor_tornado.MotorClient().test_database application = tornado.web.Application([ (r'/', MainHandler) ], db=db) application.listen(8888) tornado.ioloop.IOLoop.current().start() class MainHandler(tornado.web.RequestHandler): def get(self): db = self.settings['db']

或,

db = motor.motor_tornado.MotorClient().test_database # Create the application before creating a MotorClient. application = tornado.web.Application([ (r'/', MainHandler) ]) server = tornado.httpserver.HTTPServer(application) server.bind(8888) # Forks one process per CPU. server.start(0) # Now, in each child process, create a MotorClient. application.settings['db'] = MotorClient().test_database IOLoop.current().start()

2.4. Collection

collection = db.test_collection #或 collection = db['test_collection']

2.5. 插入数据

async def do_insert(): document = {'key': 'value'} result = await db.test_collection.insert_one(document) print('result %s' % repr(result.inserted_id)) # IOLoop.current().run_sync(do_insert)

示例,

async def do_insert(): for i in range(2000): await db.test_collection.insert_one({'i': i}) # IOLoop.current().run_sync(do_insert)

效率更高的方式,

async def do_insert(): result = await db.test_collection.insert_many( [{'i': i} for i in range(2000)]) print('inserted %d docs' % (len(result.inserted_ids),)) # IOLoop.current().run_sync(do_insert)

2.6. 查询单条数据 find_one

async def do_find_one(): document = await db.test_collection.find_one({'i': {'$lt': 1}}) pprint.pprint(document) # IOLoop.current().run_sync(do_find_one)

2.7. 查询多条数据 find

如,

async def do_find(): cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i') for document in await cursor.to_list(length=100): pprint.pprint(document) # IOLoop.current().run_sync(do_find)

2.8. 统计数据 count

async def do_count(): n = await db.test_collection.count_documents({}) print('%s documents in collection' % n) n = await db.test_collection.count_documents({'i': {'$gt': 1000}}) print('%s documents where i > 1000' % n) # IOLoop.current().run_sync(do_count)

2.9. 更新数据

替换:

async def do_replace(): coll = db.test_collection old_document = await coll.find_one({'i': 50}) print('found document: %s' % pprint.pformat(old_document)) _id = old_document['_id'] result = await coll.replace_one({'_id': _id}, {'key': 'value'}) print('replaced %s document' % result.modified_count) new_document = await coll.find_one({'_id': _id}) print('document is now %s' % pprint.pformat(new_document)) # IOLoop.current().run_sync(do_replace)

更新:

async def do_update(): coll = db.test_collection result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}}) print('updated %s document' % result.modified_count) new_document = await coll.find_one({'i': 51}) print('document is now %s' % pprint.pformat(new_document)) # IOLoop.current().run_sync(do_update) #await coll.update_many({'i': {'$gt': 100}}, # {'$set': {'key': 'value'}})

2.10. 删除数据

async def do_delete_many(): coll = db.test_collection n = await coll.count_documents({}) print('%s documents before calling delete_many()' % n) result = await db.test_collection.delete_many({'i': {'$gte': 1000}}) print('%s documents after' % (await coll.count_documents({}))) # IOLoop.current().run_sync(do_delete_many)

3. Motor With asyncio

https://motor.readthedocs.io/en/stable/tutorial-asyncio.html

类似于 Pymongo,Motor 将数据表示为 4 层对象层:

[1] - AsyncIOMotorClient

[2] - AsyncIOMotorDatabase

[3] - AsyncIOMotorCollection

[4] - AsyncIOMotorCursor

3.1. Client 客户端

import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient() client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017) client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017') #副本集(replica set) client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

3.2. Database 数据库

db = client.test_database db = client['test_database']

3.3. Collection

collection = db.test_collection collection = db['test_collection']

3.4. 插入数据

插入单条数据,

async def do_insert(): document = {'key': 'value'} result = await db.test_collection.insert_one(document) print('result %s' % repr(result.inserted_id)) # import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(do_insert())

插入多条数据,

async def do_insert(): result = await db.test_collection.insert_many( [{'i': i} for i in range(2000)]) print('inserted %d docs' % (len(result.inserted_ids),)) # loop = asyncio.get_event_loop() loop.run_until_complete(do_insert())

3.5. 查询单条数据 find_one

async def do_find_one(): document = await db.test_collection.find_one({'i': {'$lt': 1}}) pprint.pprint(document) # loop = asyncio.get_event_loop() loop.run_until_complete(do_find_one())

3.6. 查询多条数据 find

async def do_find(): cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i') for document in await cursor.to_list(length=100): pprint.pprint(document) # loop = asyncio.get_event_loop() loop.run_until_complete(do_find())

3.7. 统计数据

async def do_count(): n = await db.test_collection.count_documents({}) print('%s documents in collection' % n) n = await db.test_collection.count_documents({'i': {'$gt': 1000}}) print('%s documents where i > 1000' % n) # loop = asyncio.get_event_loop() loop.run_until_complete(do_count())

3.8. 更新数据

async def do_replace(): coll = db.test_collection old_document = await coll.find_one({'i': 50}) print('found document: %s' % pprint.pformat(old_document)) _id = old_document['_id'] result = await coll.replace_one({'_id': _id}, {'key': 'value'}) print('replaced %s document' % result.modified_count) new_document = await coll.find_one({'_id': _id}) print('document is now %s' % pprint.pformat(new_document)) # loop = asyncio.get_event_loop() loop.run_until_complete(do_replace()) #await coll.update_many({'i': {'$gt': 100}}, # {'$set': {'key': 'value'}})

3.9. 删除数据

async def do_delete_many(): coll = db.test_collection n = await coll.count_documents({}) print('%s documents before calling delete_many()' % n) result = await db.test_collection.delete_many({'i': {'$gte': 1000}}) print('%s documents after' % (await coll.count_documents({}))) # loop = asyncio.get_event_loop() loop.run_until_complete(do_delete_many())

3.10 示例 - 插入数据

import asyncio import motor.motor_asyncio # client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017) #连接数据库 db = client.test_database async def do_insert(): result = await db.lx.insert_many( [{'i': i} for i in range(20)]) # insert_many可以插入一条或多条数据,必须以列表(list)的形式组织数据 print('inserted %d docs' % (len(result.inserted_ids),)) # loop = asyncio.get_event_loop() loop.run_until_complete(do_insert())

4. pymongo 和 motor with asyncio 对比

4.1. pymongo

import time from pymongo import MongoClient start = time.time() connection = MongoClient('127.0.0.1',27017) db = connection['test_database'] for doc in db.test_collection.find({}, ['_id', 'start_time', 'end_idx']): db.test_collection.update_one({'_id': doc.get('_id')}, { '$set': { 'end_idx': 1 } }) print("[INFO]Timecost:", time.time() - start)

4.2. motor with asyncio

import time import asyncio from motor.motor_asyncio import AsyncIOMotorClient start = time.time() connection = AsyncIOMotorClient('127.0.0.1',27017) db = connection['test_database'] async def run(): async for doc in db.LiePin_Analysis1.find({}, ['_id', 'start_time', 'end_idx']): db.LiePin_Analysis1.update_one({'_id': doc.get('_id')}, {'$set': {'end_idx':0}}) # asyncio.get_event_loop().run_until_complete(run()) print("[INFO]Timecost:", time.time() - start)
Last modification:March 28th, 2021 at 04:42 pm