aPython 驱动MongoDB 两种推荐方式 - MongoDB Python Drivers

  • PyMongo 是从Python使用MongoDB的推荐方法
  • Motor 是推荐的MongoDB Python异步驱动程序

这里简单汇总下 Motor 异步驱动 MongoDB. 文档可参考 Motor(Async Driver).

https://github.com/mongodb/motor/

Motor是一个异步 mongodb driver,支持异步读写mongodb. 通常用在基于Tornado的异步web服务器中;同时支持使用asyncio(Python3.4以上标准库)作为异步模型,使用起来十分方便.

1. motor 安装

pip install motor

1.1. 连接 MongoDB Atlas

连接 MongoDB Atlas 集群,如,

import motor

client = motor.motor_tornado.MotorClient(
   "mongodb+srv://<username>:<password>@<cluster-url>/test?retryWrites=true&w=majority")
db = client.test

2. Motor With Tornado

https://motor.readthedocs.io/en/stable/tutorial-tornado.html

pip install tornado motor

类似于 Pymongo,Motor 将数据表示为 4 层对象层:

[1] - MotorClient

[2] - MotorDatabase

[3] - MotorCollection

[4] - MotorCursor

2.1. Client 客户端

import motor.motor_tornado

client = motor.motor_tornado.MotorClient()
client = motor.motor_tornado.MotorClient('localhost', 27017)
client = motor.motor_tornado.MotorClient('mongodb://localhost:27017')
#副本集(replica set)
client = motor.motor_tornado.MotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

2.2. Database 数据库

db = client.test_database
db = client['test_database']

2.3. Tornado 应用程序启动

#1.MotoClient 并未真正连接到 server,初始化连接
db = motor.motor_tornado.MotorClient().test_database

application = tornado.web.Application([
    (r'/', MainHandler)
], db=db)

application.listen(8888)
tornado.ioloop.IOLoop.current().start()

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        db = self.settings['db']

或,

db = motor.motor_tornado.MotorClient().test_database

# Create the application before creating a MotorClient.
application = tornado.web.Application([
    (r'/', MainHandler)
])

server = tornado.httpserver.HTTPServer(application)
server.bind(8888)

# Forks one process per CPU.
server.start(0)

# Now, in each child process, create a MotorClient.
application.settings['db'] = MotorClient().test_database
IOLoop.current().start()

2.4. Collection

collection = db.test_collection
#或
collection = db['test_collection']

2.5. 插入数据

async def do_insert():
  document = {'key': 'value'}
  result = await db.test_collection.insert_one(document)
  print('result %s' % repr(result.inserted_id))

#
IOLoop.current().run_sync(do_insert)

示例,

async def do_insert():
  for i in range(2000):
    await db.test_collection.insert_one({'i': i})
#
IOLoop.current().run_sync(do_insert)

效率更高的方式,

async def do_insert():
  result = await db.test_collection.insert_many(
    [{'i': i} for i in range(2000)])
  print('inserted %d docs' % (len(result.inserted_ids),))
#
IOLoop.current().run_sync(do_insert)

2.6. 查询单条数据 find_one

async def do_find_one():
  document = await db.test_collection.find_one({'i': {'$lt': 1}})
  pprint.pprint(document)

#
IOLoop.current().run_sync(do_find_one)

2.7. 查询多条数据 find

如,

async def do_find():
  cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
  for document in await cursor.to_list(length=100):
    pprint.pprint(document)

# 
IOLoop.current().run_sync(do_find)

2.8. 统计数据 count

async def do_count():
  n = await db.test_collection.count_documents({})
  print('%s documents in collection' % n)
  n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
  print('%s documents where i > 1000' % n)

# 
IOLoop.current().run_sync(do_count)

2.9. 更新数据

替换:

async def do_replace():
  coll = db.test_collection
  old_document = await coll.find_one({'i': 50})
  print('found document: %s' % pprint.pformat(old_document))
  _id = old_document['_id']
  result = await coll.replace_one({'_id': _id}, {'key': 'value'})
  print('replaced %s document' % result.modified_count)
  new_document = await coll.find_one({'_id': _id})
  print('document is now %s' % pprint.pformat(new_document))

#
IOLoop.current().run_sync(do_replace)

更新:

async def do_update():
  coll = db.test_collection
  result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}})
  print('updated %s document' % result.modified_count)
  new_document = await coll.find_one({'i': 51})
  print('document is now %s' % pprint.pformat(new_document))

#
IOLoop.current().run_sync(do_update)
#await coll.update_many({'i': {'$gt': 100}},
#                       {'$set': {'key': 'value'}})

2.10. 删除数据

async def do_delete_many():
  coll = db.test_collection
  n = await coll.count_documents({})
  print('%s documents before calling delete_many()' % n)
  result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
  print('%s documents after' % (await coll.count_documents({})))
#
IOLoop.current().run_sync(do_delete_many)

3. Motor With asyncio

https://motor.readthedocs.io/en/stable/tutorial-asyncio.html

类似于 Pymongo,Motor 将数据表示为 4 层对象层:

[1] - AsyncIOMotorClient

[2] - AsyncIOMotorDatabase

[3] - AsyncIOMotorCollection

[4] - AsyncIOMotorCursor

3.1. Client 客户端

import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient()
client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
#副本集(replica set)
 client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

3.2. Database 数据库

db = client.test_database
db = client['test_database']

3.3. Collection

collection = db.test_collection
collection = db['test_collection']

3.4. 插入数据

插入单条数据,

async def do_insert():
  document = {'key': 'value'}
  result = await db.test_collection.insert_one(document)
  print('result %s' % repr(result.inserted_id))

#
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())

插入多条数据,

async def do_insert():
  result = await db.test_collection.insert_many(
    [{'i': i} for i in range(2000)])
  print('inserted %d docs' % (len(result.inserted_ids),))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())

3.5. 查询单条数据 find_one

async def do_find_one():
  document = await db.test_collection.find_one({'i': {'$lt': 1}})
  pprint.pprint(document)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_find_one())

3.6. 查询多条数据 find

async def do_find():
  cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
  for document in await cursor.to_list(length=100):
    pprint.pprint(document)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_find())

3.7. 统计数据

async def do_count():
  n = await db.test_collection.count_documents({})
  print('%s documents in collection' % n)
  n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
  print('%s documents where i > 1000' % n)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_count())

3.8. 更新数据

async def do_replace():
  coll = db.test_collection
  old_document = await coll.find_one({'i': 50})
  print('found document: %s' % pprint.pformat(old_document))
  _id = old_document['_id']
  result = await coll.replace_one({'_id': _id}, {'key': 'value'})
  print('replaced %s document' % result.modified_count)
  new_document = await coll.find_one({'_id': _id})
  print('document is now %s' % pprint.pformat(new_document))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_replace())
#await coll.update_many({'i': {'$gt': 100}},
#                       {'$set': {'key': 'value'}})

3.9. 删除数据

async def do_delete_many():
  coll = db.test_collection
  n = await coll.count_documents({})
  print('%s documents before calling delete_many()' % n)
  result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
  print('%s documents after' % (await coll.count_documents({})))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_delete_many())

3.10 示例 - 插入数据

import asyncio
import motor.motor_asyncio

#
client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)
#连接数据库
db = client.test_database
 
async def do_insert():
    result = await db.lx.insert_many(
        [{'i': i} for i in range(20)])  
    # insert_many可以插入一条或多条数据,必须以列表(list)的形式组织数据
    print('inserted %d docs' % (len(result.inserted_ids),))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())

4. pymongo 和 motor with asyncio 对比

4.1. pymongo

import time
from pymongo import MongoClient

start = time.time()
connection = MongoClient('127.0.0.1',27017)
db = connection['test_database']

for doc in db.test_collection.find({}, ['_id', 'start_time', 'end_idx']):
    db.test_collection.update_one({'_id': doc.get('_id')}, {
        '$set': {
            'end_idx': 1
        }
    })

print("[INFO]Timecost:", time.time() - start)

4.2. motor with asyncio

import time
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

start = time.time()
connection = AsyncIOMotorClient('127.0.0.1',27017)
db = connection['test_database']

async def run():
    async for doc in db.LiePin_Analysis1.find({}, ['_id', 'start_time', 'end_idx']):
        db.LiePin_Analysis1.update_one({'_id': doc.get('_id')}, {'$set': {'end_idx':0}})

#
asyncio.get_event_loop().run_until_complete(run())

print("[INFO]Timecost:", time.time() - start)
Last modification:March 28th, 2021 at 04:42 pm