Python 访问 Sqlite 封装 实体,实现数据访问
- 下载
- 准备工作
-
- DBSupport 封装 根据实体自动生成sql脚本
-
- 初始化
- 创建表
- 增 Insert
- 删 Delete
- 改 Update
- 查 Query
下载
准备工作
创建实体类
class Person: name = '' age = 0 Date = '' id = '' def say_some(self): return f' 我是克隆体:{self.name} 号,\r\n 出生于 {self.Date}'
访问sqlite 增删改查
df __name__ == '__main__': db_person = DBSupport(Person) per = Person() per.name = '李四' per.id = '10001' per.age = 40 per.Date = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") i = db_person.Insert(per) if i > 0: print(f"数据插入:{per.name} 创建成功!") else: print(f"数据插入:{per.name} 创建失败!") ls = db_person.Query(f"select * from Person where id = '{per.id}'") print(f"数据查询:{ls[0].name} 说: {ls[0].say_some()} ") person = ls[0] person.name = '王五' n = db_person.Update(person) if n > 0: print(f"数据更新:{per.name} 改名为 {person.name} 成功!") else: print(f"数据更新:{per.name} 改名为 {person.name} 失败!") mm = db_person.Query(f"select * from Person where id = '{person.id}'") print(f"数据查询:{ls[0].name} 说: {ls[0].say_some()} ") n = db_person.Delete(mm[0]) if n > 0: print(f"数据删除:{mm[0].name} 删除成功!") else: print(f"数据删除:{mm[0].name} 删除失败!")
输出结果
DBSupport 封装 根据实体自动生成sql脚本
初始化
def __init__(self, T): self.T = T self.attr = list(filter(lambda x: not x[0].startswith('__'), inspect.getmembers(T, lambda a: not inspect.isfunction(a)))) self.Create()
创建表
def Create(self): """创建表,有id,就设置id 为主键id""" str_para = [] for x in self.attr: if x[0] == 'id': str_para.append(f" {x[0]} text NOT NULL PRIMARY KEY ") else: if isinstance(x[1], int): str_para.append(f" {x[0]} int ") else: str_para.append(f" {x[0]} text ") create_sql = f"create table if not exists {self.T.__name__} ({','.join(str_para)})" print(create_sql) Sqlite_DbHelper.ExecuteNonQuery(create_sql)
增 Insert
def Insert(self, model): attr = self.get_attr(model) val = ','.join(attr) vals = ','.join(list(map(lambda x: f"'{getattr(model, x)}'", attr))) insert_sql = f'insert into {self.T.__name__} ({val}) values ({vals})' print(insert_sql) return Sqlite_DbHelper.ExecuteNonQuery(insert_sql)
删 Delete
def Delete(self, model, str_where=''): """删除数据,str_where 为空时,默认用 id 删除,类 属性无id,str_where 必填""" if str_where == '': str_where = f" id = '{getattr(model, 'id')}'" str_sql = f" delete from {self.T.__name__} where {str_where} " return Sqlite_DbHelper.ExecuteNonQuery(str_sql)
改 Update
def Update(self, model, str_where=''): """更新数据,str_where 为空时,默认用 id 更新,类 属性无id,str_where 必填""" attr = self.get_attr(model) str_para = [] for x in attr: str_para.append(f"{x} = '{getattr(model, x)}'") if str_where == '': str_where = f" id = '{getattr(model, 'id')}'" str_sql = f" update {self.T.__name__} set {','.join(str_para)} where {str_where} " return Sqlite_DbHelper.ExecuteNonQuery(str_sql)
查 Query
def Query(self, strSql): """查询返回list实体对象""" ls = Sqlite_DbHelper.ExecuteSql(strSql) if len(ls) > 0 and len(ls[0]) > 0: arr = [] for row in list(ls[0]): i = 0 m = self.T() for col in ls[1]: setattr(m, col, row[i]) i += 1 arr.append(m) return arr else: return None