> 技术文档 > 四、Python 脚本常用模块(续)

四、Python 脚本常用模块(续)


4.3 Python 脚本与数据库交互

在 Python 脚本开发中,数据库是实现数据持久化存储批量数据处理多脚本数据共享的核心工具。无论是日常办公中的数据统计(如员工信息管理)、运维场景下的日志存储(如服务器运行状态记录),还是数据分析中的数据预处理(如用户行为数据清洗),都需要通过数据库实现高效的数据管理。

Python 对数据库的支持极为完善:一方面,标准库内置了sqlite3模块,可直接操作轻量级 SQLite 数据库,无需额外安装;另一方面,针对 MySQL、PostgreSQL 等主流关系型数据库,以及 MongoDB 等非关系型数据库,都有成熟的第三方库(如pymysql、psycopg2、pymongo),能满足不同场景下的需求。

本节将从数据库基础认知出发,分场景讲解 Python 操作各类数据库的核心知识,每个模块均包含「环境准备 - 连接配置 - 核心操作 - 实战示例 - 常见问题」,确保你能快速将数据库能力融入 Python 脚本开发。

4.3.1 数据库基础认知

在学习 Python 操作数据库前,需先明确数据库的核心分类与适用场景,避免开发时选错工具。

4.3.1.1 数据库的核心分类

数据库主要分为关系型数据库(RDBMS)非关系型数据库(NoSQL) 两大类,二者在数据结构、存储方式、适用场景上差异显著:

类型

核心特点

代表产品

适用场景

数据存储形式

关系型数据库

基于关系模型(二维表),支持 SQL 查询,事务 ACID 特性

MySQL、PostgreSQL、SQLite

数据结构固定(如用户表、订单表)、需事务保证(如支付)

表(行 + 列)

非关系型数据库

无固定 schema,灵活存储,查询效率高

MongoDB、Redis

非结构化数据(如日志、JSON)、高频读写(如缓存)

文档(JSON)、键值对

对 Python 脚本而言:

  • 若需轻量级本地存储(如脚本运行日志、单机数据统计),优先选SQLite(无需服务,文件式存储);
  • 若需多脚本共享数据(如团队协作的业务数据),优先选MySQL/PostgreSQL(支持网络访问,事务可靠);
  • 若需存储非结构化数据(如爬虫获取的 HTML 内容、用户行为日志),优先选MongoDB(文档式存储,灵活适配动态字段)。
4.3.1.2 核心概念辨析

无论哪种数据库,Python 脚本操作时都需理解以下基础概念:

  1. 连接(Connection):脚本与数据库的通信通道,所有操作需建立在连接基础上。使用后需关闭,避免资源泄漏;
  1. 游标(Cursor):关系型数据库中执行 SQL 语句的工具,通过游标执行查询、获取结果;
  1. 事务(Transaction):一组不可分割的操作(如 “扣款 + 下单”),需满足 ACID 特性(原子性、一致性、隔离性、持久性),避免数据异常;
  1. CRUD:数据库核心操作的缩写,即创建(Create)、读取(Read)、更新(Update)、删除(Delete),是脚本与数据库交互的核心场景。

4.3.2 标准库 sqlite3:轻量级本地数据库

sqlite3是 Python 标准库内置模块,无需额外安装,可直接操作 SQLite 数据库。SQLite 是文件式数据库(一个数据库对应一个.db文件),无需部署服务器,适合单机脚本、轻量级数据存储场景(如脚本配置存储、本地日志记录)。

4.3.2.1 环境准备

无需安装任何软件,Python 3.x 默认自带sqlite3模块,直接导入即可使用。

4.3.2.2 核心操作:从连接到 CRUD
步骤 1:建立数据库连接

通过sqlite3.connect()创建连接,参数为数据库文件路径(若文件不存在,会自动在指定路径创建):


import sqlite3

from sqlite3 import Error

def create_sqlite_connection(db_file):

\"\"\"

创建SQLite数据库连接

Args:

db_file (str): 数据库文件路径(如\"script_log.db\")

Returns:

sqlite3.Connection: 数据库连接对象,失败则返回None

\"\"\"

conn = None

try:

# 建立连接,check_same_thread=False允许跨线程使用(脚本开发常用)

conn = sqlite3.connect(db_file, check_same_thread=False)

print(f\"SQLite连接成功(版本:{sqlite3.version})\")

# 创建游标对象(用于执行SQL)

cursor = conn.cursor()

return conn, cursor

except Error as e:

print(f\"SQLite连接失败:{e}\")

return None, None

# 调用函数创建连接(数据库文件将保存在当前工作目录)

conn, cursor = create_sqlite_connection(\"script_log.db\")

步骤 2:创建表(Create Table)

通过游标执行CREATE TABLE语句,定义表结构(如创建 “脚本运行日志表”,记录时间、脚本名、状态、日志内容):


def create_log_table(cursor):

\"\"\"创建脚本运行日志表\"\"\"

try:

# SQL语句:定义表名(script_logs)和字段(id为主键自增)

create_table_sql = \'\'\'

CREATE TABLE IF NOT EXISTS script_logs (

id INTEGER PRIMARY KEY AUTOINCREMENT,

run_time TEXT NOT NULL, -- 运行时间(字符串格式,如\"2025-08-26 16:30:00\")

script_name TEXT NOT NULL, -- 脚本文件名(如\"excel_updater.py\")

status TEXT NOT NULL, -- 运行状态(\"success\"或\"failed\")

log_content TEXT -- 日志内容(如错误信息)

);

\'\'\'

# 执行SQL语句

cursor.execute(create_table_sql)

print(\"脚本运行日志表创建成功(若不存在)\")

except Error as e:

print(f\"创建表失败:{e}\")

# 调用函数创建表

if cursor:

create_log_table(cursor)

步骤 3:核心操作(CRUD)

SQLite 的 CRUD 操作均通过游标执行 SQL 语句实现,需注意参数化查询(避免 SQL 注入,禁止字符串拼接 SQL)。

3.1 插入数据(Create)

通过cursor.execute()执行INSERT语句,用?作为占位符传递参数:


from datetime import datetime

def insert_log(cursor, conn, script_name, status, log_content=\"\"):

\"\"\"

插入一条脚本运行日志

Args:

cursor: 游标对象

conn: 数据库连接对象(用于提交事务)

script_name (str): 脚本文件名

status (str): 运行状态(\"success\"或\"failed\")

log_content (str): 日志内容(可选)

\"\"\"

try:

# 1. 定义插入SQL(?为参数占位符)

insert_sql = \'\'\'

INSERT INTO script_logs (run_time, script_name, status, log_content)

VALUES (?, ?, ?, ?);

\'\'\'

# 2. 准备参数(与占位符顺序一致)

run_time = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")

params = (run_time, script_name, status, log_content)

# 3. 执行SQL

cursor.execute(insert_sql, params)

# 4. 提交事务(SQLite默认开启事务,需手动提交才会写入数据库)

conn.commit()

print(f\"日志插入成功(ID:{cursor.lastrowid})\") # lastrowid获取插入数据的主键ID

except Error as e:

# 若出错,回滚事务(避免数据部分插入)

conn.rollback()

print(f\"日志插入失败:{e}\")

# 调用函数插入2条日志(1条成功,1条失败)

if conn and cursor:

insert_log(cursor, conn, \"excel_updater.py\", \"success\", \"Excel数据更新完成,共13条记录\")

insert_log(cursor, conn, \"api_fetcher.py\", \"failed\", \"请求超时,错误信息:Connection timed out\")

3.2 查询数据(Read)

通过cursor.execute()执行SELECT语句,用cursor.fetchone()(获取一条)、cursor.fetchall()(获取所有)、cursor.fetchmany(n)(获取 n 条)获取结果:


def query_logs(cursor, status=None, script_name=None):

\"\"\"

查询脚本运行日志

Args:

cursor: 游标对象

status (str): 筛选状态(可选,如\"failed\")

script_name (str): 筛选脚本名(可选,如\"excel_updater.py\")

Returns:

list: 查询结果列表(每条记录为元组,对应字段顺序)

\"\"\"

try:

# 基础查询SQL

query_sql = \"SELECT * FROM script_logs\"

params = []

# 动态添加筛选条件(避免字符串拼接,用参数化查询)

conditions = []

if status:

conditions.append(\"status = ?\")

params.append(status)

if script_name:

conditions.append(\"script_name = ?\")

params.append(script_name)

# 拼接筛选条件

if conditions:

query_sql += \" WHERE \" + \" AND \".join(conditions)

# 按运行时间倒序(最新日志在前)

query_sql += \" ORDER BY run_time DESC;\"

# 执行查询

cursor.execute(query_sql, params)

# 获取所有结果(若数据量大,用fetchmany分批获取,避免内存占用过高)

results = cursor.fetchall()

print(f\"查询到 {len(results)} 条日志:\")

# 遍历结果(每条记录的索引对应表字段顺序:id[0], run_time[1], script_name[2], status[3], log_content[4])

for idx, log in enumerate(results, 1):

print(f\"\\n{idx}. 日志ID:{log[0]}\")

print(f\" 运行时间:{log[1]}\")

print(f\" 脚本名称:{log[2]}\")

print(f\" 运行状态:{log[3]}\")

print(f\" 日志内容:{log[4]}\")

return results

except Error as e:

print(f\"日志查询失败:{e}\")

return []

# 调用函数查询(示例1:查询所有失败日志;示例2:查询指定脚本的日志)

if cursor:

print(\"=== 查询所有失败日志 ===\")

query_logs(cursor, status=\"failed\")

print(\"\\n=== 查询excel_updater.py的所有日志 ===\")

query_logs(cursor, script_name=\"excel_updater.py\")

3.3 更新数据(Update)

通过cursor.execute()执行UPDATE语句,更新已有数据(如修正错误的日志状态):


def update_log_status(cursor, conn, log_id, new_status):

\"\"\"

更新日志的运行状态

Args:

cursor: 游标对象

conn: 数据库连接对象

log_id (int): 日志ID(主键,用于定位数据)

new_status (str): 新状态(\"success\"或\"failed\")

\"\"\"

try:

update_sql = \"UPDATE script_logs SET status = ? WHERE id = ?;\"

params = (new_status, log_id)

cursor.execute(update_sql, params)

conn.commit()

# rowcount获取受影响的行数

if cursor.rowcount > 0:

print(f\"日志ID {log_id} 状态更新成功(新状态:{new_status})\")

else:

print(f\"未找到日志ID {log_id},无数据更新\")

except Error as e:

conn.rollback()

print(f\"日志更新失败:{e}\")

# 调用函数更新日志(假设将ID为2的日志状态从failed改为success)

if conn and cursor:

update_log_status(cursor, conn, log_id=2, new_status=\"success\")

3.4 删除数据(Delete)

通过cursor.execute()执行DELETE语句,删除数据(如清理 30 天前的旧日志):


def delete_old_logs(cursor, conn, days=30):

\"\"\"

删除指定天数前的旧日志(避免数据库文件过大)

Args:

cursor: 游标对象

conn: 数据库连接对象

days (int): 保留天数(默认保留30天内的日志)

\"\"\"

try:

# 计算30天前的日期(SQLite支持日期函数,strftime(\'%Y-%m-%d\', \'now\', \'-? days\'))

delete_sql = \'\'\'

DELETE FROM script_logs

WHERE run_time < strftime(\'%Y-%m-%d\', \'now\', \'-? days\');

\'\'\'

params = (days,)

cursor.execute(delete_sql, params)

conn.commit()

print(f\"成功删除 {cursor.rowcount} 条 {days} 天前的旧日志\")

except Error as e:

conn.rollback()

print(f\"日志删除失败:{e}\")

# 调用函数删除90天前的旧日志

if conn and cursor:

delete_old_logs(cursor, conn, days=90)

步骤 4:关闭连接

操作完成后,需关闭游标和连接,释放资源:


def close_sqlite_connection(conn, cursor):

\"\"\"关闭SQLite连接和游标\"\"\"

try:

if cursor:

cursor.close()

print(\"游标已关闭\")

if conn:

conn.close()

print(\"数据库连接已关闭\")

except Error as e:

print(f\"关闭连接失败:{e}\")

# 调用函数关闭连接

if conn and cursor:

close_sqlite_connection(conn, cursor)

4.3.2.3 实战示例:脚本运行日志管理器

结合上述操作,编写一个完整的 “脚本运行日志管理器” 脚本,实现日志的自动记录、查询、清理:


import sqlite3

from sqlite3 import Error

from datetime import datetime

class SQLiteLogManager:

\"\"\"SQLite日志管理类(封装连接、CRUD、关闭操作)\"\"\"

def __init__(self, db_file=\"script_log.db\"):

self.db_file = db_file

self.conn, self.cursor = self._create_connection()

def _create_connection(self):

\"\"\"内部方法:创建数据库连接\"\"\"

conn = None

try:

conn = sqlite3.connect(self.db_file, check_same_thread=False)

cursor = conn.cursor()

self._create_log_table(cursor) # 初始化时自动创建表

return conn, cursor

except Error as e:

print(f\"连接失败:{e}\")

return None, None

def _create_log_table(self, cursor):

\"\"\"内部方法:创建日志表\"\"\"

create_sql = \'\'\'

CREATE TABLE IF NOT EXISTS script_logs (

id INTEGER PRIMARY KEY AUTOINCREMENT,

run_time TEXT NOT NULL,

script_name TEXT NOT NULL,

status TEXT NOT NULL,

log_content TEXT

);

\'\'\'

try:

cursor.execute(create_sql)

except Error as e:

print(f\"创建表失败:{e}\")

def record_log(self, script_name, status, log_content=\"\"):

\"\"\"记录日志\"\"\"

if not self.conn or not self.cursor:

print(\"未建立数据库连接,无法记录日志\")

return

try:

insert_sql = \'\'\'

INSERT INTO script_logs (run_time, script_name, status, log_content)

VALUES (?, ?, ?, ?);

\'\'\'

run_time = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")

self.cursor.execute(insert_sql, (run_time, script_name, status, log_content))

self.conn.commit()

print(f\"日志记录成功(ID:{self.cursor.lastrowid})\")

except Error as e:

self.conn.rollback()

print(f\"记录日志失败:{e}\")

def query_logs(self, status=None, script_name=None):

\"\"\"查询日志\"\"\"

if not self.cursor:

print(\"未建立数据库连接,无法查询日志\")

return []

try:

query_sql = \"SELECT * FROM script_logs\"

params = []

conditions = []

if status:

conditions.append(\"status = ?\")

params.append(status)

if script_name:

conditions.append(\"script_name = ?\")

params.append(script_name)

if conditions:

query_sql += \" WHERE \" + \" AND \".join(conditions)

query_sql += \" ORDER BY run_time DESC;\"

self.cursor.execute(query_sql, params)

results = self.cursor.fetchall()

print(f\"查询到 {len(results)} 条日志:\")

for idx, log in enumerate(results, 1):

print(f\"\\n{idx}. ID:{log[0]} | 时间:{log[1]} | 脚本:{log[2]} | 状态:{log[3]}\")

print(f\" 内容:{log[4]}\")

return results

except Error as e:

print(f\"查询日志失败:{e}\")

return []

def clean_old_logs(self, days=30):

\"\"\"清理旧日志\"\"\"

if not self.conn or not self.cursor:

print(\"未建立数据库连接,无法清理日志\")

return

try:

delete_sql = \'\'\'

DELETE FROM script_logs

WHERE run_time < strftime(\'%Y-%m-%d\', \'now\', \'-? days\');

\'\'\'

self.cursor.execute(delete_sql, (days,))

self.conn.commit()

print(f\"清理完成,共删除 {self.cursor.rowcount} 条旧日志\")

except Error as e:

self.conn.rollback()

print(f\"清理日志失败:{e}\")

def close(self):

\"\"\"关闭连接\"\"\"

try:

if self.cursor:

self.cursor.close()

if self.conn:

self.conn.close()

print(\"连接已关闭\")

except Error as e:

print(f\"关闭连接失败:{e}\")

# ------------------- 使用示例 -------------------

if __name__ == \"__main__\":

# 1. 初始化日志管理器

log_manager = SQLiteLogManager()

# 2. 记录2条脚本运行日志

log_manager.record_log(

script_name=\"excel_updater.py\",

status=\"success\",

log_content=\"更新Excel的L列数据完成,共13个unitid\"

)

log_manager.record_log(

script_name=\"api_fetcher.py\",

status=\"failed\",

log_content=\"请求API时超时,错误:Connection timed out after 10s\"

)

# 3. 查询失败日志

print(\"\\n=== 查询所有失败日志 ===\")

log_manager.query_logs(status=\"failed\")

# 4. 清理90天前的旧日志

print(\"\\n=== 清理90天前的旧日志 ===\")

log_manager.clean_old_logs(days=90)

# 5. 关闭连接

log_manager.close()

4.3.2.4 常见问题与解决
  1. “数据库被锁定” 错误

原因:多个脚本同时操作同一个 SQLite 文件,或前一个连接未关闭。

解决:① 确保操作完成后关闭连接;② 对写入操作加锁(用threading.Lock());③ 高并发场景改用 MySQL。

  1. 日期筛选无效

原因:SQLite 的日期以字符串存储,需用strftime函数处理。

解决:参考delete_old_logs方法,用strftime(\'%Y-%m-%d\', run_time)转换日期格式后筛选。

  1. 中文乱码

原因:数据库默认编码不支持中文。

解决:建立连接时指定编码(conn = sqlite3.connect(db_file, check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES)),且插入中文时确保字符串为 UTF-8 编码。

4.3.3 第三方库 pymysql:操作 MySQL 数据库

MySQL 是工业级关系型数据库,支持网络访问、多用户协作、高并发,适合多脚本共享数据的场景(如团队的业务数据统计、批量数据处理)。Python 通过pymysql库操作 MySQL,需先安装库并配置 MySQL 环境。

4.3.3.1 环境准备
1. 安装 MySQL 服务
  • Windows/MacOS:下载 MySQL 安装包(https://dev.mysql.com/downloads/mysql/),按向导安装,记住 root 用户密码。
  • Linux(Ubuntu):执行命令sudo apt update && sudo apt install mysql-server,安装后用sudo mysql_secure_installation设置密码。
2. 安装 pymysql 库

用pip安装:


pip install pymysql

3. 准备 MySQL 环境
  • 登录 MySQL:mysql -u root -p(输入密码);
  • 创建数据库(如script_db):CREATE DATABASE IF NOT EXISTS script_db DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;;
  • 授权用户(允许 Python 脚本访问):GRANT ALL PRIVILEGES ON script_db.* TO \'python_user\'@\'%\' IDENTIFIED BY \'123456\';(python_user为用户名,123456为密码,%允许所有 IP 访问);
  • 刷新权限:FLUSH PRIVILEGES;。
4.3.3.2 核心操作:从连接到 CRUD
步骤 1:建立数据库连接

通过pymysql.connect()创建连接,参数需与 MySQL 配置一致:


import pymysql

from pymysql import Error

from datetime import datetime

def create_mysql_connection(db_name=\"script_db\", host=\"localhost\", user=\"python_user\", password=\"123456\", port=3306):

\"\"\"

创建MySQL数据库连接

Args:

db_name (str): 数据库名

host (str): MySQL服务器地址(本地为localhost)

user (str): MySQL用户名

password (str): MySQL密码

port (int): MySQL端口(默认3306)

Returns:

pymysql.connections.Connection: 连接对象,失败则返回None

\"\"\"

conn = None

try:

conn = pymysql.connect(

host=host,

user=user,

password=password,

database=db_name,

port=port,

charset=\"utf8mb4\", # 支持中文

cursorclass=pymysql.cursors.DictCursor # 游标结果以字典返回(默认元组)

)

print(\"MySQL连接成功\")

return conn

except Error as e:

print(f\"MySQL连接失败:{e}\")

# 常见错误排查:① 用户名/密码错误;② 数据库不存在;③ 端口被占用;④ 远程访问未授权

if \"Unknown database\" in str(e):

print(f\"提示:数据库 {db_name} 不存在,请先创建\")

elif \"Access denied\" in str(e):

print(\"提示:用户名或密码错误,或无访问权限\")

return None

# 调用函数创建连接

conn = create_mysql_connection()

步骤 2:创建表(Create Table)

通过conn.cursor()创建游标,执行CREATE TABLE语句(如创建 “员工信息表”,存储 ID、姓名、部门、入职时间、薪资):


def create_employee_table(conn):

\"\"\"创建员工信息表\"\"\"

if not conn:

print(\"未建立连接,无法创建表\")

return

try:

# 创建游标

with conn.cursor() as cursor: # with语句自动关闭游标

create_table_sql = \'\'\'

CREATE TABLE IF NOT EXISTS employees (

id INT PRIMARY KEY AUTO_INCREMENT COMMENT \'员工ID(主键)\',

name VARCHAR(50) NOT NULL COMMENT \'员工姓名\',

department VARCHAR(50) NOT NULL COMMENT \'部门(如技术部、人事部)\',

hire_date DATE NOT NULL COMMENT \'入职日期(YYYY-MM-DD)\',

salary DECIMAL(10, 2) NOT NULL COMMENT \'薪资(保留2位小数)\',

create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT \'记录创建时间\'

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=\'员工信息表\';

\'\'\'

# 执行SQL

cursor.execute(create_table_sql)

# 提交事务(MySQL默认开启事务,需手动提交)

conn.commit()

print(\"员工信息表创建成功(若不存在)\")

except Error as e:

# 回滚事务

conn.rollback()

print(f\"创建表失败:{e}\")

# 调用函数创建表

if conn:

create_employee_table(conn)

步骤 3:核心操作(CRUD)
3.1 插入数据(Create)

支持单条插入和批量插入,批量插入效率远高于单条循环插入(适合脚本批量处理数据):


def insert_employees(conn, employees):

\"\"\"

批量插入员工数据

Args:

conn: MySQL连接对象

employees (list): 员工数据列表(每个元素为字典,对应一条记录)

\"\"\"

if not conn or not employees:

print(\"无连接或无数据,无法插入\")

return

try:

with conn.cursor() as cursor:

# 批量插入SQL(%s为参数占位符,与pymysql的参数化查询匹配)

insert_sql = \'\'\'

INSERT INTO employees (name, department, hire_date, salary)

VALUES (%s, %s, %s, %s);

\'\'\'

# 准备参数(将列表中的字典转换为元组,顺序与SQL占位符一致)

params = [

(emp[\"name\"], emp[\"department\"], emp[\"hire_date\"], emp[\"salary\"])

for emp in employees

]

# 批量执行(executemany效率高于多次execute)

cursor.executemany(insert_sql, params)

conn.commit()

print(f\"批量插入成功,共 {cursor.rowcount} 条员工数据\")

except Error as e:

conn.rollback()

print(f\"插入失败:{e}\")

# 准备批量插入的员工数据(示例:从Excel读取后整理成此格式)

employee_data = [

{\"name\": \"张三\", \"department\": \"技术部\", \"hire_date\": \"2023-01-15\", \"salary\": 8500.00},

{\"name\": \"李四\", \"department\": \"人事部\", \"hire_date\": \"2023-03-20\", \"salary\": 6800.00},

{\"name\": \"王五\", \"department\": \"技术部\", \"hire_date\": \"2023-05-10\", \"salary\": 9200.00},

{\"name\": \"赵六\", \"department\": \"财务部\", \"hire_date\": \"2023-07-05\", \"salary\": 7500.00}

]

# 调用函数批量插入

if conn:

insert_employees(conn, employee_data)

3.2 查询数据(Read)

通过cursor.fetchone()/fetchall()获取结果,支持条件筛选、排序、分页(适合脚本数据统计):


def query_employees(conn, department=None, min_salary=None, page=1, page_size=10):

\"\"\"

查询员工数据(支持按部门、薪资筛选,分页查询)

Args:

conn: MySQL连接对象

department (str): 部门筛选(可选)

min_salary (float): 最低薪资筛选(可选)

page (int): 页码(默认第1页)

page_size (int): 每页条数(默认10条)

Returns:

dict: 查询结果(total:总条数,data:当前页数据)

\"\"\"

if not conn:

print(\"未建立连接,无法查询\")

return {\"total\": 0, \"data\": []}

try:

with conn.cursor() as cursor:

# 1. 查询总条数(用于分页)

count_sql = \"SELECT COUNT(*) AS total FROM employees\"

params = []

conditions = []

# 动态添加筛选条件

if department:

conditions.append(\"department = %s\")

params.append(department)

if min_salary is not None:

conditions.append(\"salary >= %s\")

params.append(min_salary)

if conditions:

count_sql += \" WHERE \" + \" AND \".join(conditions)

cursor.execute(count_sql, params)

total = cursor.fetchone()[\"total\"] # 因cursorclass是DictCursor,结果为字典

# 2. 查询当前页数据(分页公式:OFFSET (page-1)*page_size)

query_sql = \"SELECT * FROM employees\"

if conditions:

query_sql += \" WHERE \" + \" AND \".join(conditions)

# 按入职时间倒序,分页

query_sql += f\" ORDER BY hire_date DESC LIMIT %s OFFSET %s;\"

# 分页参数(LIMIT后为page_size,OFFSET后为偏移量)

query_params = params + [page_size, (page-1)*page_size]

cursor.execute(query_sql, query_params)

data = cursor.fetchall()

# 打印结果

print(f\"\\n=== 第 {page} 页员工数据(共 {total} 条) ===\")

for emp in data:

print(f\"ID:{emp[\'id\']} | 姓名:{emp[\'name\']} | 部门:{emp[\'department\']}\")

print(f\"入职时间:{emp[\'hire_date\']} | 薪资:{emp[\'salary\']} | 创建时间:{emp[\'create_time\']}\")

return {\"total\": total, \"data\": data}

except Error as e:

print(f\"查询失败:{e}\")

return {\"total\": 0, \"data\": []}

# 调用函数查询(示例1:查询技术部薪资≥8000的员工,第1页,每页2条)

if conn:

query_result = query_employees(

conn,

department=\"技术部\",

min_salary=8000.00,

page=1,

page_size=2

)

print(f\"\\n查询结果:共 {query_result[\'total\']} 条,当前页 {len(query_result[\'data\'])} 条\")

3.3 更新数据(Update)

通过UPDATE语句更新数据,支持批量更新(如调整某部门所有员工的薪资):


def update_employee_salary(conn, department, salary_increase):

\"\"\"

批量调整某部门员工的薪资(加薪)

Args:

conn: MySQL连接对象

department (str): 目标部门

salary_increase (float): 加薪金额(如500.00表示每人加500)

\"\"\"

if not conn or salary_increase <= 0:

print(\"未建立连接或加薪金额无效,无法更新\")

return

try:

with conn.cursor() as cursor:

# SQL:salary = salary + %s(批量加薪)

update_sql = \'\'\'

UPDATE employees

SET salary = salary + %s

WHERE department = %s;

\'\'\'

params = (salary_increase, department)

cursor.execute(update_sql, params)

conn.commit()

if cursor.rowcount > 0:

print(f\"部门 {department} 薪资更新成功,共 {cursor.rowcount} 名员工加薪 {salary_increase} 元\")

else:

print(f\"部门 {department} 无员工数据,未更新\")

except Error as e:

conn.rollback()

print(f\"薪资更新失败:{e}\")

# 调用函数:给技术部员工每人加薪500元

if conn:

update_employee_salary(conn, department=\"技术部\", salary_increase=500.00)

3.4 删除数据(Delete)

通过DELETE语句删除数据,需谨慎操作(建议先查询确认,或添加软删除字段如is_deleted):


def delete_employee_by_id(conn, emp_id):

\"\"\"

按员工ID删除数据(硬删除,谨慎使用)

Args:

conn: MySQL连接对象

emp_id (int): 员工ID

\"\"\"

if not conn:

print(\"未建立连接,无法删除\")

return

# 先查询确认(避免误删)

with conn.cursor() as cursor:

cursor.execute(\"SELECT name FROM employees WHERE id = %s;\", (emp_id,))

emp = cursor.fetchone()

if not emp:

print(f\"未找到ID为 {emp_id} 的员工,无需删除\")

return

print(f\"确认删除员工:{emp[\'name\']}(ID:{emp_id})\")

# 确认后删除

try:

with conn.cursor() as cursor:

delete_sql = \"DELETE FROM employees WHERE id = %s;\"

cursor.execute(delete_sql, (emp_id,))

conn.commit()

print(f\"员工ID {emp_id} 删除成功\")

except Error as e:

conn.rollback()

print(f\"删除失败:{e}\")

# 调用函数删除ID为4的员工(示例,实际需谨慎)

if conn:

delete_employee_by_id(conn, emp_id=4)

步骤 4:关闭连接

def close_mysql_connection(conn):

\"\"\"关闭MySQL连接\"\"\"

if conn:

try:

conn.close()

print(\"MySQL连接已关闭\")

except Error as e:

print(f\"关闭连接失败:{e}\")

# 调用函数关闭连接

if conn:

close_mysql_connection(conn)

4.3.3.3 实战示例:员工薪资统计脚本

结合pandas(之前章节讲解的 Excel 处理库),编写一个 “读取 Excel 员工数据→批量插入 MySQL→统计薪资→导出统计结果到 Excel” 的完整脚本:


import pymysql

from pymysql import Error

import pandas as pd

from datetime import datetime

class EmployeeSalaryManager:

\"\"\"员工薪资管理类(MySQL操作+Excel交互)\"\"\"

def __init__(self, db_config, excel_file=\"员工数据.xlsx\"):

\"\"\"

初始化

Args:

db_config (dict): MySQL配置(host, user, password, db, port)

excel_file (str): Excel文件路径

\"\"\"

self.db_config = db_config

self.excel_file = excel_file

self.conn = self._create_mysql_connection()

self._create_employee_table() # 初始化表

def _create_mysql_connection(self):

\"\"\"创建MySQL连接\"\"\"

try:

conn = pymysql.connect(

host=self.db_config[\"host\"],

user=self.db_config[\"user\"],

password=self.db_config[\"password\"],

database=self.db_config[\"db\"],

port=self.db_config[\"port\"],

charset=\"utf8mb4\",

cursorclass=pymysql.cursors.DictCursor

)

return conn

except Error as e:

print(f\"MySQL连接失败:{e}\")

return None

def _create_employee_table(self):

\"\"\"创建员工表\"\"\"

if not self.conn:

return

create_sql = \'\'\'

CREATE TABLE IF NOT EXISTS employees (

id INT PRIMARY KEY AUTO_INCREMENT,

name VARCHAR(50) NOT NULL,

department VARCHAR(50) NOT NULL,

hire_date DATE NOT NULL,

salary DECIMAL(10,2) NOT NULL,

create_time DATETIME DEFAULT CURRENT_TIMESTAMP

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

\'\'\'

try:

with self.conn.cursor() as cursor:

cursor.execute(create_sql)

self.conn.commit()

except Error as e:

self.conn.rollback()

print(f\"创建表失败:{e}\")

def import_from_excel(self):

\"\"\"从Excel读取员工数据并批量插入MySQL\"\"\"

if not self.conn:

print(\"未建立MySQL连接,无法导入\")

return

try:

# 1. 读取Excel(假设Excel表头为:姓名、部门、入职日期、薪资)

df = pd.read_excel(self.excel_file, sheet_name=\"员工表\")

print(f\"从Excel读取到 {len(df)} 条员工数据\")

# 2. 数据预处理(确保日期格式正确,薪资为数值)

df[\"入职日期\"] = pd.to_datetime(df[\"入职日期\"]).dt.strftime(\"%Y-%m-%d\") # 转换为MySQL日期格式

df[\"薪资\"] = df[\"薪资\"].astype(float)

# 3. 批量插入MySQL

insert_sql = \"INSERT INTO employees (name, department, hire_date, salary) VALUES (%s, %s, %s, %s);\"

params = [

(row[\"姓名\"], row[\"部门\"], row[\"入职日期\"], row[\"薪资\"])

for _, row in df.iterrows()

]

with self.conn.cursor() as cursor:

cursor.executemany(insert_sql, params)

self.conn.commit()

print(f\"成功导入 {cursor.rowcount} 条数据到MySQL\")

except Exception as e:

if self.conn:

self.conn.rollback()

print(f\"从Excel导入失败:{e}\")

def salary_statistics(self, export_excel=\"薪资统计结果.xlsx\"):

\"\"\"统计各部门薪资情况(平均薪资、最高/最低薪资、员工数量),并导出到Excel\"\"\"

if not self.conn:

print(\"未建立MySQL连接,无法统计\")

return

try:

with self.conn.cursor() as cursor:

# 统计SQL:按部门分组,计算薪资指标

stat_sql = \'\'\'

SELECT

department AS 部门,

COUNT(*) AS 员工数量,

AVG(salary) AS 平均薪资,

MAX(salary) AS 最高薪资,

MIN(salary) AS 最低薪资

FROM employees

GROUP BY department

ORDER BY 平均薪资 DESC;

\'\'\'

cursor.execute(stat_sql)

stat_result = cursor.fetchall()

# 转换为DataFrame(便于导出Excel)

df_stat = pd.DataFrame(stat_result)

# 格式化薪资(保留2位小数)

df_stat[\"平均薪资\"] = df_stat[\"平均薪资\"].round(2)

df_stat[\"最高薪资\"] = df_stat[\"最高薪资\"].round(2)

df_stat[\"最低薪资\"] = df_stat[\"最低薪资\"].round(2)

# 打印统计结果

print(\"\\n=== 各部门薪资统计结果 ===\")

print(df_stat.to_string(index=False))

# 导出到Excel

df_stat.to_excel(export_excel, index=False, sheet_name=\"薪资统计\")

print(f\"\\n统计结果已导出到 {export_excel}\")

return df_stat

except Error as e:

print(f\"薪资统计失败:{e}\")

return None

def close(self):

\"\"\"关闭连接\"\"\"

if self.conn:

try:

self.conn.close()

print(\"MySQL连接已关闭\")

except Error as e:

print(f\"关闭连接失败:{e}\")

# ------------------- 使用示例 -------------------

if __name__ == \"__main__\":

# MySQL配置(替换为你的实际配置)

mysql_config = {

\"host\": \"localhost\",

\"user\": \"python_user\",

\"password\": \"123456\",

\"db\": \"script_db\",

\"port\": 3306

}

# 初始化薪资管理器

salary_manager = EmployeeSalaryManager(

db_config=mysql_config,

excel_file=\"员工数据.xlsx\" # 提前准备好此Excel文件

)

# 1. 从Excel导入数据到MySQL

print(\"=== 从Excel导入员工数据 ===\")

salary_manager.import_from_excel()

# 2. 统计薪资并导出结果

print(\"\\n=== 各部门薪资统计 ===\")

salary_manager.salary_statistics(export_excel=\"2025年薪资统计.xlsx\")

# 3. 关闭连接

salary_manager.close()

4.3.3.4 常见问题与解决
  1. “Access denied for user” 错误

原因:用户名 / 密码错误,或无访问目标数据库的权限。

解决:① 核对create_mysql_connection的user和password参数;② 登录 MySQL 执行GRANT语句授权(参考环境准备步骤)。

  1. 中文乱码

原因:数据库 / 表的编码不支持中文,或连接时未指定 charset。

解决:① 建立连接时添加charset=\"utf8mb4\";② 创建数据库时指定DEFAULT CHARSET utf8mb4(参考环境准备步骤)。

  1. 批量插入效率低

原因:未使用executemany,或 MySQL 的autocommit开启(默认关闭)。

解决:① 用cursor.executemany()批量执行;② 确保事务批量提交(避免每条插入都 commit)。

  1. 日期格式错误

原因:Python 传递的日期格式与 MySQL 不匹配(MySQL 要求YYYY-MM-DD)。

解决:用datetime.strftime(\"%Y-%m-%d\")或pandas转换日期格式后再插入。

4.3.4 第三方库 pymongo:操作 MongoDB 数据库

MongoDB 是主流的非关系型数据库(文档型),无固定 schema,适合存储非结构化 / 半结构化数据(如脚本爬取的网页内容、用户行为日志、JSON 格式数据)。Python 通过pymongo库操作 MongoDB,支持灵活的文档存储与查询。

4.3.4.1 环境准备
1. 安装 MongoDB 服务
  • Windows/MacOS:下载 MongoDB Community Server(Download MongoDB Community Server | MongoDB),按向导安装。
  • Linux(Ubuntu):执行命令sudo apt update && sudo apt install mongodb-org,安装后启动服务sudo systemctl start mongod。
2. 安装 pymongo 库

用pip安装:


pip install pymongo

3. 准备 MongoDB 环境
  • 启动 MongoDB 后,默认端口为 27017,无需密码(本地开发);
  • 若需远程访问或密码认证,需修改 MongoDB 配置文件(如/etc/mongod.conf),开启认证并创建用户。
4.3.4.2 核心概念

MongoDB 的核心概念与关系型数据库不同,需先理解对应关系:

关系型数据库

MongoDB

说明

数据库(DB)

数据库(DB)

存储数据的容器,一个 MongoDB 可包含多个数据库

表(Table)

集合(Collection)

存储文档的集合,类似表,但无固定结构

行(Row)

文档(Document)

一条数据,格式为 JSON(MongoDB 中为 BSON)

列(Column)

字段(Field)

文档中的键值对,字段可动态添加

4.3.4.3 核心操作:从连接到 CRUD
步骤 1:建立 MongoDB 连接

通过pymongo.MongoClient()创建连接,获取数据库和集合(若不存在,操作时会自动创建):


from pymongo import MongoClient

from pymongo.errors import ConnectionFailure, PyMongoError

from datetime import datetime

import json

def create_mongo_connection(host=\"localhost\", port=27017, db_name=\"script_db\"):

\"\"\"

创建MongoDB连接,获取数据库对象

Args:

host (str): MongoDB服务器地址

port (int): MongoDB端口(默认27017)

db_name (str): 数据库名

Returns:

pymongo.database.Database: 数据库对象,失败则返回None

\"\"\"

try:

# 创建客户端连接

client = MongoClient(host, port, serverSelectionTimeoutMS=5000) # 5秒超时

# 验证连接(避免假连接)

client.admin.command(\'ping\')

# 获取数据库(不存在则自动创建)

db = client[db_name]

print(\"MongoDB连接成功\")

return db

except ConnectionFailure as e:

print(f\"MongoDB连接失败(服务器不可达):{e}\")

return None

except PyMongoError as e:

print(f\"MongoDB操作错误:{e}\")

return None

# 调用函数创建连接,获取数据库对象

db = create_mongo_connection(db_name=\"script_log_db\")

步骤 2:获取集合(Collection)

集合类似关系型数据库的表,通过数据库对象获取(不存在则自动创建):


# 获取“脚本运行日志”集合(类似表)

log_collection = db[\"script_run_logs\"]

# 获取“API请求日志”集合

api_collection = db[\"api_request_logs\"]

print(f\"当前数据库的所有集合:{db.list_collection_names()}\") # 查看已存在的集合

步骤 3:核心操作(CRUD)

MongoDB 的 CRUD 操作通过集合对象实现,文档格式为 Python 字典(自动转换为 BSON)。

3.1 插入文档(Create)

支持单文档插入(insert_one())和多文档插入(insert_many()):


def insert_script_log(log_collection, script_name, status, log_content=\"\", extra_info=None):

\"\"\"

插入一条脚本运行日志到MongoDB

Args:

log_collection: 集合对象(script_run_logs)

script_name (str): 脚本名

status (str): 运行状态(success/failed)

log_content (str): 日志内容

extra_info (dict): 额外信息(如运行时间、错误码,可选)

Returns:

str: 插入文档的ID(ObjectId),失败则返回None

\"\"\"

try:

# 构造文档(JSON格式,字段可动态添加)

log_document = {

\"script_name\": script_name,

\"status\": status,

\"log_content\": log_content,

\"run_time\": datetime.now(), # MongoDB自动存储为datetime类型

\"created_at\": datetime.now()

}

# 添加额外信息(若有)

if extra_info and isinstance(extra_info, dict):

log_document.update(extra_info)

# 插入单文档

result = log_collection.insert_one(log_document)

print(f\"脚本日志插入成功,文档ID:{result.inserted_id}\")

return str(result.inserted_id) # ObjectId转换为字符串,便于后续使用

except PyMongoError as e:

print(f\"插入脚本日志失败:{e}\")

return None

def insert_api_logs(api_collection, api_logs):

\"\"\"

批量插入API请求日志到MongoDB

Args:

api_collection: 集合对象(api_request_logs)

api_logs (list): API日志列表(每个元素为字典)

Returns:

list: 插入文档的ID列表,失败则返回空列表

\"\"\"

if not api_logs or not isinstance(api_logs, list):

print(\"API日志数据无效,无法插入\")

return []

try:

# 为每条日志添加创建时间

for log in api_logs:

log[\"created_at\"] = datetime.now()

# 批量插入多文档

result = api_collection.insert_many(api_logs)

inserted_ids = [str(_id) for _id in result.inserted_ids]

print(f\"批量插入API日志成功,共 {len(inserted_ids)} 条,IDs:{inserted_ids}\")

return inserted_ids

except PyMongoError as e:

print(f\"批量插入API日志失败:{e}\")

return []

# ------------------- 插入示例 -------------------

if log_collection and api_collection:

# 1. 插入一条脚本运行日志(含额外信息)

insert_script_log(

log_collection,

script_name=\"excel_updater.py\",

status=\"success\",

log_content=\"更新Excel L列完成,13个unitid均成功\",

extra_info={\"execution_time\": 12.5, \"affected_rows\": 13} # 额外添加“执行时间”和“影响行数”

)

# 2. 批量插入API请求日志

api_log_data = [

{

\"api_url\": \"http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist\",

\"method\": \"POST\",

\"status_code\": 200,

\"response_time\": 0.8,

\"unitid\": \"00020001\",

\"params\": {\"jjlx\": \"02\"}

},

{

\"api_url\": \"http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist\",

\"method\": \"POST\",

\"status_code\": 200,

\"response_time\": 0.6,

\"unitid\": \"00020002\",

\"params\": {\"jjlx\": \"02\"}

},

{

\"api_url\": \"http://111.41.51.214:8088/sqjz/private/sqjz/sxzhcx/jzrylist\",

\"method\": \"POST\",

\"status_code\": 500,

\"response_time\": 1.2,

\"unitid\": \"00020003\",

\"params\": {\"jjlx\": \"02\"},

\"error\": \"Server internal error\"

}

]

insert_api_logs(api_collection, api_log_data)

3.2 查询文档(Read)

通过find()(查询多条)、find_one()(查询一条)实现,支持条件筛选、排序、分页、字段投影(只返回指定字段):


def query_script_logs(log_collection, status=None, min_exec_time=None, page=1, page_size=10):

\"\"\"

查询脚本运行日志

Args:

log_collection: 集合对象

status (str): 筛选状态(success/failed,可选)

min_exec_time (float): 最小执行时间(可选)

page (int): 页码

page_size (int): 每页条数

Returns:

dict: 查询结果(total:总条数,data:当前页数据)

\"\"\"

try:

# 1. 构建查询条件(字典格式,支持多条件组合)

query_filter = {}

if status:

query_filter[\"status\"] = status # 精确匹配状态

if min_exec_time is not None:

# 大于等于:$gte(MongoDB查询操作符,类似SQL的>=)

query_filter[\"execution_time\"] = {\"$gte\": min_exec_time}

# 2. 计算分页参数(跳过前 (page-1)*page_size 条)

skip = (page - 1) * page_size

# 3. 查询总条数

total = log_collection.count_documents(query_filter)

# 4. 查询当前页数据(按运行时间倒序,只返回指定字段)

# 字段投影:1表示返回,0表示不返回(_id默认返回,需显式设为0)

projection = {

\"_id\": 0, # 不返回_id

\"script_name\": 1,

\"status\": 1,

\"run_time\": 1,

\"execution_time\": 1,

\"log_content\": 1

}

# 执行查询:find(条件, 投影) -> 排序(-1表示倒序) -> 分页(skip+limit)

cursor = log_collection.find(query_filter, projection) \\

.sort(\"run_time\", -1) \\

.skip(skip) \\

.limit(page_size)

# 转换为列表(cursor是迭代器,需遍历获取数据)

data = list(cursor)

# 格式化时间(MongoDB的datetime转换为字符串,便于查看)

for item in data:

if \"run_time\" in item:

item[\"run_time\"] = item[\"run_time\"].strftime(\"%Y-%m-%d %H:%M:%S\")

# 打印结果

print(f\"\\n=== 脚本日志查询结果(第 {page} 页,共 {total} 条) ===\")

for idx, log in enumerate(data, 1):

print(f\"\\n{idx}. 脚本名:{log[\'script_name\']}\")

print(f\" 状态:{log[\'status\']}\")

print(f\" 运行时间:{log[\'run_time\']}\")

print(f\" 执行时间:{log.get(\'execution_time\', \'未知\')}秒\")

print(f\" 日志内容:{log[\'log_content\']}\")

return {\"total\": total, \"data\": data}

except PyMongoError as e:

print(f\"查询脚本日志失败:{e}\")

return {\"total\": 0, \"data\": []}

def query_failed_api_logs(api_collection, unitid=None):

\"\"\"

查询失败的API请求日志(status_code != 200)

Args:

api_collection: 集合对象

unitid (str): 筛选指定unitid(可选)

Returns:

list: 失败日志列表

\"\"\"

try:

# 查询条件:status_code != 200($ne:不等于),可选筛选unitid

query_filter = {\"status_code\": {\"$ne\": 200}}

if unitid:

query_filter[\"unitid\"] = unitid

# 按响应时间倒序,返回所有字段

cursor = api_collection.find(query_filter).sort(\"response_time\", -1)

failed_logs = list(cursor)

# 格式化输出

print(f\"\\n=== 失败的API请求日志(共 {len(failed_logs)} 条) ===\")

for log in failed_logs:

print(f\"\\nAPI URL:{log[\'api_url\']}\")

print(f\"Method:{log[\'method\']}\")

print(f\"UnitID:{log[\'unitid\']}\")

print(f\"状态码:{log[\'status_code\']}\")

print(f\"响应时间:{log[\'response_time\']}秒\")

print(f\"错误信息:{log.get(\'error\', \'无\')}\")

print(f\"请求参数:{json.dumps(log[\'params\'], ensure_ascii=False)}\")

return failed_logs

except PyMongoError as e:

print(f\"查询失败API日志失败:{e}\")

return []

# ------------------- 查询示例 -------------------

if log_collection and api_collection:

# 1. 查询执行时间≥10秒的成功日志(第1页,每页5条)

query_script_logs(

log_collection,

status=\"success\",

min_exec_time=10,

page=1,

page_size=5

)

# 2. 查询所有失败的API请求日志

query_failed_api_logs(api_collection)

3.3 更新文档(Update)

通过update_one()(更新一条)、update_many()(更新多条)实现,支持字段修改、数值增减、数组操作等:


def update_api_log_status(api_collection, unitid, new_status_code, new_error=None):

\"\"\"

更新指定unitid的API日志状态码和错误信息

Args:

api_collection: 集合对象

unitid (str): 目标unitid

new_status_code (int): 新状态码

new_error (str): 新错误信息(可选)

Returns:

int: 受影响的文档数,失败则返回0

\"\"\"

try:

# 查询条件:匹配unitid和状态码非200(只更新失败的日志)

query_filter = {\"unitid\": unitid, \"status_code\": {\"$ne\": 200}}

# 更新操作:$set表示设置字段值

update_operation = {\"$set\": {\"status_code\": new_status_code, \"updated_at\": datetime.now()}}

if new_error:

update_operation[\"$set\"][\"error\"] = new_error

# 更新一条匹配的文档(若需更新所有匹配文档,用update_many())

result = api_collection.update_one(query_filter, update_operation)

if result.matched_count > 0:

print(f\"成功更新 {result.modified_count} 条API日志(unitid:{unitid})\")

return result.modified_count

else:

print(f\"未找到unitid为 {unitid} 的失败API日志,无需更新\")

return 0

except PyMongoError as e:

print(f\"更新API日志失败:{e}\")

return 0

def increase_script_exec_time(log_collection, script_name, increase_seconds):

\"\"\"

增加指定脚本的执行时间(如修正统计误差)

Args:

log_collection: 集合对象

script_name (str): 脚本名

increase_seconds (float): 增加的秒数

\"\"\"

try:

query_filter = {\"script_name\": script_name, \"status\": \"success\"}

# $inc:数值增减(+为增,-为减)

update_operation = {\"$inc\": {\"execution_time\": increase_seconds}, \"$set\": {\"updated_at\": datetime.now()}}

result = log_collection.update_many(query_filter, update_operation)

print(f\"成功为 {result.modified_count} 条 {script_name} 的日志增加 {increase_seconds} 秒执行时间\")

except PyMongoError as e:

print(f\"增加执行时间失败:{e}\")

# ------------------- 更新示例 -------------------

if log_collection and api_collection:

# 1. 更新unitid为00020003的API日志(状态码从500改为503,错误信息更新)

update_api_log_status(

api_collection,

unitid=\"00020003\",

new_status_code=503,

new_error=\"Service unavailable (server maintenance)\"

)

# 2. 为excel_updater.py的所有成功日志增加1.2秒执行时间(修正统计误差)

increase_script_exec_time(log_collection, \"excel_updater.py\", 1.2)

3.4 删除文档(Delete)

通过delete_one()(删除一条)、delete_many()(删除多条)实现,需谨慎操作(建议先查询确认):


def delete_old_logs(collection, days=30, log_type=\"script\"):

\"\"\"

删除指定天数前的旧日志(支持脚本日志和API日志)

Args:

collection: 集合对象

days (int): 保留天数(删除days天前的日志)

log_type (str): 日志类型(script/api,用于确定时间字段)

Returns:

int: 删除的文档数,失败则返回0

\"\"\"

try:

# 计算days天前的时间(MongoDB的datetime类型)

cutoff_time = datetime.now() - pd.Timedelta(days=days) # 用pandas计算时间差(也可手动计算)

# 确定时间字段(脚本日志用run_time,API日志用created_at)

time_field = \"run_time\" if log_type == \"script\" else \"created_at\"

# 查询条件:时间字段 < cutoff_time($lt:小于)

query_filter = {time_field: {\"$lt\": cutoff_time}}

# 删除所有匹配的文档

result = collection.delete_many(query_filter)

print(f\"成功删除 {result.deleted_count} 条 {days} 天前的{log_type}日志\")

return result.deleted_count

except Exception as e:

print(f\"删除旧日志失败:{e}\")

return 0

# ------------------- 删除示例 -------------------

if log_collection and api_collection:

# 1. 删除90天前的脚本日志

delete_old_logs(log_collection, days=90, log_type=\"script\")

# 2. 删除60天前的API日志

delete_old_logs(api_collection, days=60, log_type=\"api\")

步骤 4:索引优化(提升查询效率)

MongoDB 默认无索引(除_id外),若脚本频繁按某字段查询(如unitid、script_name),需创建索引提升效率:


def create_indexes(collection, index_fields):

\"\"\"

为集合创建索引

Args:

collection: 集合对象

index_fields (list): 索引字段列表(如[(\"unitid\", 1), (\"status_code\", -1)])

1表示升序,-1表示降序

\"\"\"

try:

# 创建复合索引(若需单字段索引,传入单元素列表即可)

index_name = collection.create_index(index_fields)

print(f\"成功为集合 {collection.name} 创建索引:{index_name}(字段:{index_fields})\")

except PyMongoError as e:

print(f\"创建索引失败:{e}\")

# ------------------- 索引示例 -------------------

if api_collection:

# 为API日志集合创建复合索引(unitid升序 + status_code降序)

create_indexes(api_collection, [(\"unitid\", 1), (\"status_code\", -1)])

if log_collection:

# 为脚本日志集合创建单字段索引(script_name升序)

create_indexes(log_collection, [(\"script_name\", 1)])

4.3.4.4 实战示例:API 请求日志分析脚本

编写一个 “记录 API 请求日志→存储到 MongoDB→分析请求成功率→生成可视化报告” 的脚本,结合matplotlib实现数据可视化:


from pymongo import MongoClient

from pymongo.errors import PyMongoError

from datetime import datetime

import matplotlib.pyplot as plt

import pandas as pd

import json

# 设置中文字体(避免matplotlib中文乱码)

plt.rcParams[\'font.sans-serif\'] = [\'WenQuanYi Zen Hei\']

plt.rcParams[\'axes.unicode_minus\'] = False

class APILogAnalyzer:

\"\"\"API请求日志分析器(MongoDB存储+可视化分析)\"\"\"

def __init__(self, host=\"localhost\", port=27017, db_name=\"api_analysis_db\"):

self.db = self._create_mongo_connection(host, port, db_name)

self.api_collection = self.db[\"