> 文档中心 > 【Python实战基础】Flask 使用 SQLAlchemy 怎么连接 MySQL 及 增加 / 更新 / 删除 / 查询 基础操作

【Python实战基础】Flask 使用 SQLAlchemy 怎么连接 MySQL 及 增加 / 更新 / 删除 / 查询 基础操作

目录

一、简介

二、主要知识点

三、菜鸟实战

1、创建 python 文件

2、文件目录

3、运行结果


一、简介

SQLAlchemy 是一个常用的数据库抽象层和数据库关系映射包(ORM),可以让我们操作数据库跟操作对象是一样的,非常方便,因为一个表就抽象成一个类,一条数据就抽象成该类的一个对象。

SQLALchemy 让开发者不⽤直接和 SQL 语句打交道,⽽是通过 Python 对象来操作数据库,在舍弃⼀些性能开销的同时,换来的是开发效率的较⼤提升。

二、主要知识点

  • 数据库操作主要包括:

  • 连接数据库
  • 生成数据表
  • 插入单条数据
  • 插入多条数据
  • 查询单条数据
  • 查询多条数据
  • 条件查询数据
  • 更新数据
  • 删除数据

三、菜鸟实战

马上安排!

1、创建 python 文件

# 导入包from flask import Flask, jsonifyfrom flask_sqlalchemy import SQLAlchemyimport pymysqlimport random, string, json# 初始化 APPapp = Flask(__name__)# 初始化数据库类型pymysql.install_as_MySQLdb()class Config(object):    # 基础配置    # 设置连接数据库的URL    # 配置 sqlalchemy  "数据库+数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码"    # SQLALCHEMY_DATABASE_URI = "mysql://root:root@localhost:5000/flaskdb"    host = '127.0.0.1'    user = 'db_user'    password = 'db_password'    database = 'db_name'    app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://%s:%s@%s:3306/%s" % (user, password, host,database)    # 设置sqlalchemy自动更跟踪数据库    SQLALCHEMY_TRACK_MODIFICATIONS = True    # 查询时会显示原始SQL语句    app.config['SQLALCHEMY_ECHO'] = True# 读取配置app.config.from_object(Config)# 创建数据库 sqlalchemy 工具对象db = SQLAlchemy(app)class User(db.Model):    # 定义表名    __tablename__ = 'users'    # 定义字段    id = db.Column(db.Integer, primary_key=True, autoincrement=True)    name = db.Column(db.String(64), unique=True, index=True)    email = db.Column(db.String(64), unique=True)@app.route('/')def hello_world():  # put application's code here    return 'Flask ORM 使用!'@app.route('/create-item')def create_item():  # put application's code here    # 随机数据    randName = ''.join(random.sample(string.ascii_letters + string.digits, 8))    email = random.randint(100000, 900000)    # 插入一条数据    user = User(name=randName, email=email)    db.session.add(user)    db.session.commit()    return "添加记录成功"@app.route('/create-items')def create_items():  # put application's code here    # 一次插入多条数据    userList = []    for i in range(1, 3): # 随机数据 randName = ''.join(random.sample(string.ascii_letters + string.digits, 8)) email = str(random.randint(100000, 900000))  + "qq.com" user = User(name=randName, email=email) userList.append(user)    db.session.add_all(userList)    db.session.commit()    return "批量添加记录成功"@app.route('/users')def query_all():  # put application's code here    # 查询所有记录    users = User.query.all()    return "批量查询成功"@app.route('/users/cond/')def query_cond(name):  # put application's code here    # 条件查询    # 查询所有记录    users = User.query.filter(User.name==name).all()    return "条件查询成功"@app.route('/users/first')def query_first():  # put application's code here    # 查询第一个    user = User.query.first()    return "查询第一个成功"@app.route('/users/update')def query_for_update():  # put application's code here    # 查询后更新    user = User.query.first()    user.name = 'dong' + ''.join(random.sample(string.ascii_letters + string.digits, 8))    db.session.commit()    return "查询后更新成功"@app.route('/users/delete')def query_for_delete():    # 查询后删除    user = User.query.first()    if user is not None: db.session.delete(user) db.session.commit()    return "查询后删除成功"@app.route('/users/')def query_get(id):  # put application's code here    # 根据主键 id 查询    user = User.query.get(id)    return "根据 id 查询成功"@app.route('/create-table')def create_table():  # put application's code here    # 创建所有表    db.create_all()    return "创建表执行成功"@app.route('/drop-table')def drop_table():  # put application's code here    # 删除所有表    db.drop_all()    return "删除表执行成功"if __name__ == '__main__':    app.run()

2、文件目录

 py-003/

├── app.py
├── static
├── templates
└── venv

3、运行结果

 

 菜鸟实战,持续学习!