import pymongoimport loggingimport datetimeimport syslogging.basicConfig(format='%(asctime)s | %(pathname)s | line:%(lineno)d | %(levelname)s | %(message)s',level=logging.DEBUG, filemode='a',filename='D:\\IVLogs\\' + datetime.datetime.now().strftime('%Y%m%d') + '_PY.log')rf_handler = logging.StreamHandler(sys.stderr)rf_handler.setLevel(logging.DEBUG)rf_handler.setFormatter(logging.Formatter('%(asctime)s | %(pathname)s | line:%(lineno)d | %(levelname)s | %(message)s'))logging.getLogger().addHandler(rf_handler)class mongoDB_Operation: """ 函数功能:初始化加载参数 """ def __init__(self,database,datatable): self.myclient = pymongo.MongoClient('mongodb://localhost:27017/') self.database = database self.mybd = self.myclient[database] self.mycol = self.mybd[datatable] """检查数据库是否存在""" def checkDataBase(self): dblist = self.myclient.list_database_names() if self.database in dblist: logging.info("数据库:%s 存在"%self.database) self.isCheckOK = 1 else: logging.info("数据库:%s 不存在"%self.database) self.isCheckOK = 0"""数据库操作:增(insert)""" """函数功能:插入一条数据""" def insert_one(self,tupstr): try: result = self.mycol.insert_one(tupstr) logging.info("数据插入成功:%s"%result.inserted_id) except Exception as e: logging.info("执行函数:insert_one失败,错误信息%s"%e) """函数功能:插入多条数据""" def insert_many(self,listStr): try: result = self.mycol.insert_many(listStr) logging.info("数据插入成功:%s"%result.inserted_ids) except Exception as e: logging.info("执行函数:insert_many失败,错误信息:%s"%e) """数据库操作;查找(find)""" """函数功能:查找表中第一条元素""" def find_one(self): try: self.checkDataBase() if self.isCheckOK ==0: logging.info("当前数据库不存在,无法执行查询操作") return dit =self.mycol.find_one() logging.info("首数据查询成功") return dit except Exception as e: logging.info("执行函数:find_one失败,错误信息:%s"%e) """函数功能:查找全部数据""" def find_all(self): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return dits = self.mycol.find() logging.info("全部数据查询成功") return dits except Exception as e: logging.info("执行函数:find_all失败,错误信息%s"%e) """函数功能:查询指定字段显示""" def find_partShow(self,rules): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return dits = self.mycol.find({},rules) logging.info("按照条件,数据查询成功") return dits except Exception as e: logging.info("执行函数:find_partShow失败,错误信息%s"%e) """函数功能: 查询指定的字段""" def find_rules(self,rules): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return dits = self.mycol.find(rules) logging.info("按照条件,数据查询成功") return dits except Exception as e: logging.info("执行函数:find_rules失败,错误信息%s"%e) """函数功能:按照查询的条数返回数据""" def find_limit(self,num): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return dits = self.mycol.find().limit(num) logging.info("按照行数查询数据成功") return dits except Exception as e: logging.info("执行函数:find_limit失败,错误信息%s"%e) """数据库操作:修改(update)""" """函数功能:仅仅修改匹配条件的第一条数据""" def update_one(self,rules,newValue): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return self.mycol.update_one(rules,newValue) logging.info("首条匹配数据修改成功") except Exception as e: logging.info("执行函数:updata_one失败,错误信息%s"%e) """函数功能:按照条件返回所有满足条件的数据""" def update_many(self,rules,newValue): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return dits = self.mycol.update_one(rules,newValue) return dits except Exception as e: logging.info("") """数据库操作:排序""" def sort(self,key,order=1): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return mydoc = self.mycol.find().sort(key,order) return mydoc except Exception as e: logging.info("执行函数:sort 失败:错误信息%s"%e) """数据库操作:删除delete""" """函数功能:删除单条数据""" def delete_one(self,rule): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return self.mycol.delete_one(rule) logging.info("单条数据删除成功") except Exception as e: logging.info("执行函数delete_one失败,错误信息%s"%e) """删除多条数据""" def delete_many(self,rules): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行查询操作") return self.mycol.delete_many(rules) logging.info("数据删除成功") except Exception as e: logging.info("执行函数delete_many失败,错误信息%s"%e) """删除所有数据""" def delete_all(self): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行删除操作") return self.mycol.delete_many({}) logging.info("全部数据删除成功") except Exception as e: logging.info("执行函数delete_all失败,错误信息%s"%e) """删除表""" def drop(self): try: self.checkDataBase() if self.isCheckOK == 0: logging.info("当前数据库不存在,无法执行删除表操作") return self.mycol.drop() logging.info("表删除成功") except Exception as e: logging.info("执行函数drop失败,错误信息%s"%e)def testInsert(): database ="runoobdb" datatable ="sitesTest" db = mongoDB_Operation(database,datatable) mydict = { "name": "RUNOOB", "alexa": "10000", "url": "https://www.runoob.com" } db.insert_one(mydict) mylist = [ { "_id": 1, "name": "RUNOOB", "cn_name": "菜鸟教程"}, { "_id": 2, "name": "Google", "address": "Google 搜索"}, { "_id": 3, "name": "Facebook", "address": "脸书"}, { "_id": 4, "name": "Taobao", "address": "淘宝"}, { "_id": 5, "name": "Zhihu", "address": "知乎"} ] db.insert_many(mylist)def testFind(): database ="runoobdb" datatable ="sitesTest" db = mongoDB_Operation(database,datatable) x=db.find_one() print(x) x1=db.find_all() for data in x1: print(data) myqurey1= {'_id':0,'name':1,'address':1} x2 = db.find_partShow(myqurey1) for data in x2: print(data) myqurey2={'name':1} x3 = db.find_partShow(myqurey2) for data in x3: print(data) myqurey3 ={"name":{"$gt":"H"}} x4 = db.find_rules(myqurey3) for data in x4: print(data) myqurey4={"name":"Taobao"} x5 = db.find_rules(myqurey4) for data in x5: print(data) myqurey5 = {"name":{"$regex":"R"}} x6 = db.find_rules(myqurey5) for data in x6: print(data) x7 = db.find_limit(10) for data in x7: print(data)def testUpdate(): database ="runoobdb" datatable ="sitesTest" db = mongoDB_Operation(database,datatable) myquery ={"name":"RUNOOB"} newValue = {"$set":{"name":"kangxin"}} db.update_one(myquery,newValue) for data in db.find_all(): print(data) myquery1 = {"name":{"$regex":"^G"}} newValue1 = {"$set":{"address":"王丽霞"}} db.update_many(myquery1,newValue1) for x in db.find_all(): print(x)def sort(): database ="runoobdb" datatable ="sitesTest" db = mongoDB_Operation(database,datatable) mydoc= db.sort("_id",-1) for data in mydoc: print(data)def delete(): database ="runoobdb" datatable ="sitesTest" db = mongoDB_Operation(database,datatable) myquery = {"name":"kangxin"} for x in db.find_all(): print(x) myquery1 = {"name":{"$regex":"Z"}} db.delete_many`(`myquery1) for x in db.find_all(): print(x) db.delete_all() for x in db.find_all(): print(x) db.drop() db.checkDataBase()
网赚站