> 文档中心 > Python 访问 Sqlite 封装 实体,实现数据访问

Python 访问 Sqlite 封装 实体,实现数据访问

Python 访问 Sqlite 封装 实体,实现数据访问

  • 下载
  • 准备工作
    • 创建实体类
    • 访问sqlite 增删改查
    • 输出结果
  • DBSupport 封装 根据实体自动生成sql脚本
    • 初始化
    • 创建表
    • 增 Insert
    • 删 Delete
    • 改 Update
    • 查 Query

下载

准备工作

Python 访问 Sqlite 封装 实体,实现数据访问

创建实体类

class Person:    name = ''    age = 0    Date = ''    id = ''    def say_some(self): return f' 我是克隆体:{self.name} 号,\r\n 出生于 {self.Date}'

访问sqlite 增删改查

df __name__ == '__main__':    db_person = DBSupport(Person)    # 创建    per = Person()    per.name = '李四'    per.id = '10001'    per.age = 40    per.Date = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")    i = db_person.Insert(per)    if i > 0: print(f"数据插入:{per.name} 创建成功!")    else: print(f"数据插入:{per.name} 创建失败!")    # 查询    ls = db_person.Query(f"select * from Person where id = '{per.id}'")    print(f"数据查询:{ls[0].name} 说: {ls[0].say_some()} ")    # 更新    person = ls[0]    person.name = '王五'    n = db_person.Update(person)    if n > 0: print(f"数据更新:{per.name} 改名为 {person.name} 成功!")    else: print(f"数据更新:{per.name} 改名为 {person.name} 失败!")    # 查询    mm = db_person.Query(f"select * from Person where id = '{person.id}'")    print(f"数据查询:{ls[0].name} 说: {ls[0].say_some()} ")    # 删除    n = db_person.Delete(mm[0])    if n > 0: print(f"数据删除:{mm[0].name} 删除成功!")    else: print(f"数据删除:{mm[0].name} 删除失败!")

输出结果

Python 访问 Sqlite 封装 实体,实现数据访问

DBSupport 封装 根据实体自动生成sql脚本

初始化

    def __init__(self, T): self.T = T # 获取类 除 系统属性外的 自定义属性 self.attr = list(filter(lambda x: not x[0].startswith('__'),    inspect.getmembers(T, lambda a: not inspect.isfunction(a)))) self.Create()

创建表

    def Create(self): """创建表,有id,就设置id 为主键id""" str_para = [] for x in self.attr:     if x[0] == 'id':  # 默认id 为主键  str_para.append(f" {x[0]} text NOT NULL PRIMARY KEY ")     else:  if isinstance(x[1], int):      str_para.append(f" {x[0]}  int ")  else:      str_para.append(f" {x[0]}  text ") create_sql = f"create table if not exists {self.T.__name__} ({','.join(str_para)})" print(create_sql) Sqlite_DbHelper.ExecuteNonQuery(create_sql)

增 Insert

    def Insert(self, model): attr = self.get_attr(model)  # 获取类属性 val = ','.join(attr)   # 获取列名 vals = ','.join(list(map(lambda x: f"'{getattr(model, x)}'", attr)))  # 获取值 insert_sql = f'insert into {self.T.__name__} ({val}) values ({vals})' print(insert_sql) return Sqlite_DbHelper.ExecuteNonQuery(insert_sql)

删 Delete

    def Delete(self, model, str_where=''): """删除数据,str_where 为空时,默认用 id 删除,类 属性无id,str_where 必填""" if str_where == '':     str_where = f" id = '{getattr(model, 'id')}'" str_sql = f" delete from {self.T.__name__}  where {str_where} " return Sqlite_DbHelper.ExecuteNonQuery(str_sql)

改 Update

    def Update(self, model, str_where=''): """更新数据,str_where 为空时,默认用 id 更新,类 属性无id,str_where 必填""" attr = self.get_attr(model) str_para = [] for x in attr:     str_para.append(f"{x} = '{getattr(model, x)}'") if str_where == '':     str_where = f" id = '{getattr(model, 'id')}'" str_sql = f" update {self.T.__name__} set {','.join(str_para)} where {str_where} " return Sqlite_DbHelper.ExecuteNonQuery(str_sql)

查 Query

    def Query(self, strSql): """查询返回list实体对象""" ls = Sqlite_DbHelper.ExecuteSql(strSql) if len(ls) > 0 and len(ls[0]) > 0:     arr = []     for row in list(ls[0]):  i = 0  m = self.T()  for col in ls[1]:      setattr(m, col, row[i])      i += 1  arr.append(m)     return arr else:     return None