> 文档中心 > python封装mongoDB数据库

python封装mongoDB数据库

import pymongoimport loggingimport datetimeimport sys#日志环境配置logging.basicConfig(format='%(asctime)s | %(pathname)s | line:%(lineno)d | %(levelname)s | %(message)s',level=logging.DEBUG, filemode='a',filename='D:\\IVLogs\\' + datetime.datetime.now().strftime('%Y%m%d') + '_PY.log')rf_handler = logging.StreamHandler(sys.stderr)rf_handler.setLevel(logging.DEBUG)rf_handler.setFormatter(logging.Formatter('%(asctime)s | %(pathname)s | line:%(lineno)d | %(levelname)s | %(message)s'))logging.getLogger().addHandler(rf_handler)class mongoDB_Operation:    """    函数功能:初始化加载参数    """    def __init__(self,database,datatable): self.myclient = pymongo.MongoClient('mongodb://localhost:27017/') #有密码验证时self.myclient = pymongo.MongoClient('mongodb://用户名:密码@ip:端口/'+database)# self.database = database self.mybd = self.myclient[database] self.mycol = self.mybd[datatable] """检查数据库是否存在"""    def checkDataBase(self): dblist = self.myclient.list_database_names() if self.database in dblist:     logging.info("数据库:%s 存在"%self.database)     self.isCheckOK = 1 else:     logging.info("数据库:%s 不存在"%self.database)     self.isCheckOK = 0"""数据库操作:增(insert)"""    """函数功能:插入一条数据"""    def insert_one(self,tupstr): try:     result = self.mycol.insert_one(tupstr)     logging.info("数据插入成功:%s"%result.inserted_id) except Exception as e:     logging.info("执行函数:insert_one失败,错误信息%s"%e)    """函数功能:插入多条数据"""    def insert_many(self,listStr): try:     result = self.mycol.insert_many(listStr)     logging.info("数据插入成功:%s"%result.inserted_ids) except Exception as e:     logging.info("执行函数:insert_many失败,错误信息:%s"%e) """数据库操作;查找(find)"""    """函数功能:查找表中第一条元素"""    def find_one(self): try:     self.checkDataBase()     if self.isCheckOK ==0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dit =self.mycol.find_one()     logging.info("首数据查询成功")     return dit except Exception as e:     logging.info("执行函数:find_one失败,错误信息:%s"%e)     """函数功能:查找全部数据"""    def find_all(self): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dits = self.mycol.find()     logging.info("全部数据查询成功")     return dits except Exception as e:     logging.info("执行函数:find_all失败,错误信息%s"%e) """函数功能:查询指定字段显示"""    def find_partShow(self,rules): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dits = self.mycol.find({},rules)     logging.info("按照条件,数据查询成功")     return dits except Exception as e:     logging.info("执行函数:find_partShow失败,错误信息%s"%e)    """函数功能: 查询指定的字段"""    def find_rules(self,rules): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dits = self.mycol.find(rules)     logging.info("按照条件,数据查询成功")     return dits except Exception as e:     logging.info("执行函数:find_rules失败,错误信息%s"%e)    """函数功能:按照查询的条数返回数据"""    def find_limit(self,num): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dits = self.mycol.find().limit(num)     logging.info("按照行数查询数据成功")     return dits except Exception as e:     logging.info("执行函数:find_limit失败,错误信息%s"%e)    """数据库操作:修改(update)""" """函数功能:仅仅修改匹配条件的第一条数据"""    def update_one(self,rules,newValue): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     self.mycol.update_one(rules,newValue)     logging.info("首条匹配数据修改成功") except Exception as e:     logging.info("执行函数:updata_one失败,错误信息%s"%e) """函数功能:按照条件返回所有满足条件的数据"""    def update_many(self,rules,newValue): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return     dits = self.mycol.update_one(rules,newValue)     return dits except Exception as e:     logging.info("") """数据库操作:排序"""    def sort(self,key,order=1): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return   mydoc = self.mycol.find().sort(key,order)     return mydoc except Exception as e:     logging.info("执行函数:sort 失败:错误信息%s"%e) """数据库操作:删除delete"""    """函数功能:删除单条数据"""    def delete_one(self,rule): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return   self.mycol.delete_one(rule)     logging.info("单条数据删除成功") except Exception as e:     logging.info("执行函数delete_one失败,错误信息%s"%e) """删除多条数据"""    def delete_many(self,rules): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行查询操作")  return   self.mycol.delete_many(rules)     logging.info("数据删除成功") except Exception as e:     logging.info("执行函数delete_many失败,错误信息%s"%e)  """删除所有数据"""    def delete_all(self): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行删除操作")  return   self.mycol.delete_many({})     logging.info("全部数据删除成功") except Exception as e:     logging.info("执行函数delete_all失败,错误信息%s"%e) """删除表"""    def drop(self): try:     self.checkDataBase()     if self.isCheckOK == 0:  logging.info("当前数据库不存在,无法执行删除表操作")  return   self.mycol.drop()     logging.info("表删除成功") except Exception as e:     logging.info("执行函数drop失败,错误信息%s"%e)#测试用例def testInsert():    database ="runoobdb"  #数据库    datatable ="sitesTest"   #数据表    db = mongoDB_Operation(database,datatable)    #测试单条数据    mydict = { "name": "RUNOOB", "alexa": "10000", "url": "https://www.runoob.com" }    db.insert_one(mydict) #测试多条数据    mylist = [      { "_id": 1, "name": "RUNOOB", "cn_name": "菜鸟教程"},      { "_id": 2, "name": "Google", "address": "Google 搜索"},      { "_id": 3, "name": "Facebook", "address": "脸书"},      { "_id": 4, "name": "Taobao", "address": "淘宝"},      { "_id": 5, "name": "Zhihu", "address": "知乎"}    ]    db.insert_many(mylist)def testFind():    database ="runoobdb"  #数据库    datatable ="sitesTest"   #数据表    db = mongoDB_Operation(database,datatable)    #查找单条数据    x=db.find_one()    print(x)    #查找全部数据    x1=db.find_all()    for data in x1: print(data) #按照规则查询    myqurey1= {'_id':0,'name':1,'address':1}    x2 = db.find_partShow(myqurey1)    for data in x2: print(data)    #仅显示一条数据    myqurey2={'name':1}    x3 = db.find_partShow(myqurey2)    for data in x3: print(data) #高级查询    myqurey3 ={"name":{"$gt":"H"}} #查询首字符大于H的数据    x4 = db.find_rules(myqurey3)    for data in x4: print(data) #按照指定数据显示    myqurey4={"name":"Taobao"}    x5 = db.find_rules(myqurey4)    for data in x5: print(data)    #正则表达式查询    myqurey5 = {"name":{"$regex":"R"}} #查询首字母为R的数据    x6 = db.find_rules(myqurey5)    for data in x6: print(data)    #查询指定的数据量    x7 = db.find_limit(10)    for data in x7: print(data)def testUpdate():    database ="runoobdb"  #数据库    datatable ="sitesTest"   #数据表    db = mongoDB_Operation(database,datatable)    #仅修改匹配到的第一条数据    myquery ={"name":"RUNOOB"}    newValue = {"$set":{"name":"kangxin"}}  #修改对应的字段名    db.update_one(myquery,newValue)    for data in db.find_all(): print(data) #条件查找    myquery1 = {"name":{"$regex":"^G"}}    newValue1 = {"$set":{"address":"王丽霞"}}    db.update_many(myquery1,newValue1)    for x in db.find_all(): print(x)def sort():    database ="runoobdb"  #数据库    datatable ="sitesTest"   #数据表    db = mongoDB_Operation(database,datatable)    mydoc= db.sort("_id",-1) #默认正序,-1为逆序    for data in mydoc: print(data)def delete():    database ="runoobdb"  #数据库    datatable ="sitesTest"   #数据表    db = mongoDB_Operation(database,datatable)    #删除单条数据    myquery = {"name":"kangxin"}    #db.delete_one(myquery)    for x in db.find_all(): print(x) #删除多条数据    myquery1 = {"name":{"$regex":"Z"}} #数据中存在Z的name项    db.delete_many`(`myquery1)    for x in db.find_all(): print(x) #删除所有数据    db.delete_all()    for x in db.find_all(): print(x)    #删除表    db.drop()    db.checkDataBase()    

网赚站