> 技术文档 > Python 操作 MongoDB 的 10 个高效技巧(PyMongo 详解)_python使用mogodb

Python 操作 MongoDB 的 10 个高效技巧(PyMongo 详解)_python使用mogodb


Python 操作 MongoDB 的 10 个高效技巧(PyMongo 详解)

    • 第一章:PyMongo 概述与基础概念
      • 1.1 MongoDB 与 PyMongo 简介
        • 1.1.1 PyMongo 的核心优势
        • 1.1.2 安装与环境配置
      • 1.2 MongoDB 基础概念
        • 1.2.1 数据结构层次
        • 1.2.2 BSON 数据类型
      • 1.3 基本连接操作
        • 1.3.1 连接字符串格式
        • 1.3.2 连接示例
        • 1.3.3 数据库和集合操作
    • 第二章:高效连接管理与配置优化
      • 2.1 连接池深度解析
        • 2.1.1 连接池的工作原理
        • 2.1.2 连接池配置参数
        • 2.2.3 连接状态监控
      • 2.2 高级连接管理技巧
        • 2.2.1 多主机连接配置
        • 2.2.2 连接故障转移策略
        • 2.2.3 连接池监控和维护
    • 第三章:高效数据操作与查询优化
      • 3.1 批量操作最佳实践
        • 3.1.1 批量插入性能对比
        • 3.1.2 批量写入操作
      • 3.2 查询优化技巧
        • 3.2.1 索引优化实战
        • 3.2.2 查询性能分析工具
      • 3.3 聚合管道高级技巧
        • 3.3.1 复杂聚合操作
        • 3.3.2 聚合管道优化技巧

第一章:PyMongo 概述与基础概念

1.1 MongoDB 与 PyMongo 简介

MongoDB 是一个基于分布式文件存储的开源文档数据库,由 C++ 语言编写,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。它介于关系数据库和非关系数据库之间,是非关系数据库当中功能最丰富,最像关系数据库的产品。
PyMongo 是 MongoDB 的官方 Python 驱动程序,提供了丰富而直观的 API,让 Python 开发者能够轻松地与 MongoDB 数据库进行交互。作为官方维护的驱动程序,PyMongo 保持了与 MongoDB 新特性的同步更新,并提供了最佳的性能和稳定性。

1.1.1 PyMongo 的核心优势
  1. 原生支持 BSON 格式
    PyMongo 自动在 Python 字典和 BSON(Binary JSON)之间进行转换,使得数据存储和检索变得极其简单。BSON 是 MongoDB 使用的二进制编码格式,支持更丰富的数据类型,包括日期、二进制数据等。
  2. 丰富的查询接口
    提供了与 MongoDB 查询语言几乎一对一的映射,支持复杂的查询、聚合操作、地理空间查询等。
  3. 连接池管理
    内置智能连接池机制,自动管理数据库连接的创建和销毁,显著提升应用程序性能。
  4. 异步支持
    支持异步 I/O 操作,可以与 asyncio 框架无缝集成,适合高并发场景。
  5. 完善的错误处理
    提供了详细的错误信息和异常处理机制,帮助开发者快速定位和解决问题。
1.1.2 安装与环境配置

安装 PyMongo

# 基础安装pip install pymongo# 安装带有额外功能的版本pip install pymongo[srv] # 支持连接字符串的SRV记录pip install pymongo[tls] # 支持TLS/SSL加密pip install pymongo[aws] # 支持AWS认证# 或者安装所有额外功能pip install pymongo[all]

验证安装

import pymongoprint(f\"PyMongo 版本: {pymongo.version}\")

1.2 MongoDB 基础概念

1.2.1 数据结构层次

文档(Document)
MongoDB 中的基本数据单元,类似于 JSON 对象,但支持更丰富的数据类型。文档采用 BSON 格式存储。

# 一个示例文档document = { \"_id\": ObjectId(\"507f1f77bcf86cd799439011\"), \"name\": \"张三\", \"age\": 30, \"email\": \"zhangsan@example.com\", \"address\": { \"street\": \"人民路123号\", \"city\": \"北京\", \"zip\": \"100000\" }, \"hobbies\": [\"读书\", \"游泳\", \"摄影\"], \"created_at\": datetime.datetime.utcnow()}

集合(Collection)
一组文档的容器,类似于关系型数据库中的表。集合不需要预定义结构,不同文档可以有不同的字段。
数据库(Database)
多个集合的物理容器,一个 MongoDB 实例可以承载多个数据库。

1.2.2 BSON 数据类型

MongoDB 支持丰富的 BSON 数据类型:

类型 描述 Python 对应类型 ObjectId 12字节的唯一标识符 bson.ObjectId String UTF-8 字符串 str Boolean 布尔值 bool Integer 32位或64位整数 int Double 双精度浮点数 float Array 数组/列表 list Object 嵌入式文档 dict Date 日期时间 datetime.datetime Null 空值 None Binary 二进制数据 bytes Timestamp 时间戳 bson.Timestamp Regular Expression 正则表达式 bson.Regex

1.3 基本连接操作

1.3.1 连接字符串格式

基本格式

mongodb://用户名:密码@主机:端口/数据库名?选项=值

选项参数示例

  • authSource=admin:认证数据库
  • replicaSet=mySet:副本集名称
  • ssl=true:启用 SSL
  • connectTimeoutMS=30000:连接超时时间(毫秒)
  • socketTimeoutMS=30000:套接字超时时间
1.3.2 连接示例
from pymongo import MongoClientfrom urllib.parse import quote_plusimport datetime# 方式1:简单连接client = MongoClient(\'localhost\', 27017)# 方式2:使用连接字符串client = MongoClient(\'mongodb://localhost:27017/\')# 方式3:带认证的连接username = quote_plus(\'admin\')password = quote_plus(\'secret\')client = MongoClient(f\'mongodb://{username}:{password}@localhost:27017/\')# 方式4:连接副本集client = MongoClient( \'mongodb://host1:27017,host2:27017,host3:27017/\', replicaSet=\'myReplicaSet\')# 方式5:连接 MongoDB Atlas(云数据库)atlas_uri = \"mongodb+srv://username:password@cluster0.mongodb.net/test?retryWrites=true&w=majority\"client = MongoClient(atlas_uri)
1.3.3 数据库和集合操作
# 获取数据库(如果不存在会自动创建)db = client[\'mydatabase\']# 或者db = client.mydatabase# 获取集合collection = db[\'mycollection\']# 或者collection = db.mycollection# 列出所有数据库名称database_names = client.list_database_names()print(\"数据库列表:\", database_names)# 列出当前数据库的所有集合collection_names = db.list_collection_names()print(\"集合列表:\", collection_names)

第二章:高效连接管理与配置优化

2.1 连接池深度解析

2.1.1 连接池的工作原理

PyMongo 使用连接池来管理数据库连接,这是一个重要的性能优化机制。连接池在应用程序启动时创建一组数据库连接,并在整个应用程序生命周期中重用这些连接。
连接池的优势:

  1. 减少连接建立开销:TCP 连接和 MongoDB 认证只需要在连接创建时进行一次
  2. 提高响应速度:直接从池中获取可用连接,避免新建连接的延迟
  3. 资源控制:防止连接数无限增长,保护数据库资源
  4. 健康检查:自动检测和淘汰不可用的连接
2.1.2 连接池配置参数
from pymongo import MongoClientfrom pymongo.errors import ConnectionFailure# 详细的连接池配置client = MongoClient( \'mongodb://localhost:27017/\', # 连接池大小配置 maxPoolSize=100, # 连接池最大连接数 minPoolSize=10, # 连接池最小保持的连接数 # 超时配置 connectTimeoutMS=20000, # 连接建立超时(毫秒) socketTimeoutMS=30000, # 套接字操作超时(毫秒) serverSelectionTimeoutMS=30000, # 服务器选择超时 # 连接等待配置 waitQueueTimeoutMS=10000, # 获取连接的最大等待时间 # 连接生命周期 maxIdleTimeMS=60000, # 连接最大空闲时间 maxLifeTimeMS=3600000, # 连接最大生命周期 # 读写偏好 readPreference=\'primary\', # 读取偏好 w=\'majority\',  # 写确认级别 # 重试配置 retryWrites=True, # 自动重试写操作 retryReads=True # 自动重试读操作)
2.2.3 连接状态监控
def monitor_connection_pool(client): \"\"\"监控连接池状态\"\"\" try: # 获取服务器状态信息 server_info = client.server_info() print(f\"MongoDB 版本: {server_info[\'version\']}\") # 获取数据库命令统计(包含连接信息) db = client.admin server_status = db.command(\'serverStatus\') # 连接池统计信息 connections = server_status.get(\'connections\', {}) print(f\"当前连接数: {connections.get(\'current\', \'N/A\')}\") print(f可用连接数: {connections.get(\'available\', \'N/A\')}\") print(f\"总连接数: {connections.get(\'totalCreated\', \'N/A\')}\") # 网络统计 network = server_status.get(\'network\', {}) print(f\"网络请求数: {network.get(\'numRequests\', \'N/A\')}\") return True except ConnectionFailure as e: print(f\"连接失败: {e}\") return False# 执行监控monitor_connection_pool(client)

2.2 高级连接管理技巧

2.2.1 多主机连接配置
# 配置多主机连接,实现高可用client = MongoClient( \'mongodb://host1:27017,host2:27017,host3:27017/\', replicaSet=\'myReplicaSet\', # 读取偏好配置 readPreference=\'secondaryPreferred\', # 优先从副本读取 # 连接策略 connect=True,  # 立即连接 directConnection=False, # 不直接连接到特定主机 # 心跳配置 heartbeatFrequencyMS=10000, # 心跳检测频率 appname=\'MyApp\' # 应用程序标识)# 检查连接状态def check_replica_set_status(client): try: # 获取副本集状态 admin_db = client.admin status = admin_db.command(\'replSetGetStatus\') print(\"副本集状态:\") for member in status[\'members\']: state_str = { 1: \'PRIMARY\', 2: \'SECONDARY\', 3: \'RECOVERING\', 4: \'STARTUP\', 5: \'UNKNOWN\', 6: \'ARBITER\', 7: \'DOWN\', 8: \'ROLLBACK\', 9: \'REMOVED\' }.get(member[\'state\'], \'UNKNOWN\') print(f\" {member[\'name\']}: {state_str} (延迟: {member.get(\'lag\', \'N/A\')}s)\") return True except Exception as e: print(f\"获取副本集状态失败: {e}\") return Falsecheck_replica_set_status(client)
2.2.2 连接故障转移策略
from pymongo import MongoClientfrom pymongo.errors import AutoReconnect, NetworkTimeout, ServerSelectionTimeoutErrorimport timeclass RobustMongoClient: def __init__(self, connection_string, max_retries=3, retry_delay=1): self.connection_string = connection_string self.max_retries = max_retries self.retry_delay = retry_delay self.client = None self.connect() def connect(self): \"\"\"建立连接,支持重试机制\"\"\" for attempt in range(self.max_retries): try: self.client = MongoClient(  self.connection_string,  serverSelectionTimeoutMS=5000,  connectTimeoutMS=5000,  socketTimeoutMS=30000 ) # 测试连接 self.client.admin.command(\'ping\') print(\"MongoDB 连接成功\") return True except (ServerSelectionTimeoutError, ConnectionFailure) as e: print(f\"连接尝试 {attempt + 1}/{self.max_retries} 失败: {e}\") if attempt < self.max_retries - 1:  time.sleep(self.retry_delay * (2 ** attempt)) # 指数退避 else:  print(\"达到最大重试次数,连接失败\")  raise def execute_with_retry(self, operation, *args, **kwargs): \"\"\"带重试机制的执行操作\"\"\" for attempt in range(self.max_retries): try: return operation(*args, **kwargs) except (AutoReconnect, NetworkTimeout) as e: print(f\"操作失败,尝试重连 ({attempt + 1}/{self.max_retries}): {e}\") if attempt < self.max_retries - 1:  time.sleep(self.retry_delay)  self.connect() # 重新连接 else:  raise def close(self): \"\"\"关闭连接\"\"\" if self.client: self.client.close() print(\"MongoDB 连接已关闭\")# 使用示例robust_client = RobustMongoClient(\'mongodb://localhost:27017/\')try: db = robust_client.client[\'testdb\'] collection = db[\'testcollection\'] # 执行查询(自动重试) result = robust_client.execute_with_retry( collection.find_one, {\'name\': \'test\'} ) print(\"查询结果:\", result) finally: robust_client.close()
2.2.3 连接池监控和维护
import threadingimport timefrom datetime import datetimeclass ConnectionPoolMonitor: def __init__(self, client, interval=60): self.client = client self.interval = interval self.monitoring = False self.thread = None def start_monitoring(self): \"\"\"启动连接池监控\"\"\" self.monitoring = True self.thread = threading.Thread(target=self._monitor_loop) self.thread.daemon = True self.thread.start() print(\"连接池监控已启动\") def stop_monitoring(self): \"\"\"停止监控\"\"\" self.monitoring = False if self.thread: self.thread.join() print(\"连接池监控已停止\") def _monitor_loop(self): \"\"\"监控循环\"\"\" while self.monitoring: try: self._check_pool_status() time.sleep(self.interval) except Exception as e: print(f\"监控错误: {e}\") time.sleep(self.interval) def _check_pool_status(self): \"\"\"检查连接池状态\"\"\" try: # 获取数据库统计信息 db = self.client.admin server_status = db.command(\'serverStatus\') connections = server_status.get(\'connections\', {}) current_time = datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\') print(f\"[{current_time}] 连接池状态:\") print(f\" 当前连接: {connections.get(\'current\', \'N/A\')}\") print(f\" 可用连接: {connections.get(\'available\', \'N/A\')}\") print(f\" 总创建数: {connections.get(\'totalCreated\', \'N/A\')}\") # 检查连接健康状态 try: # 简单查询测试连接健康 db.command(\'ping\') print(\" 连接状态: 健康\") except Exception as e: print(f\" 连接状态: 异常 - {e}\") except Exception as e: print(f\"获取连接状态失败: {e}\")# 使用监控monitor = ConnectionPoolMonitor(client)monitor.start_monitoring()# 在应用程序退出时# monitor.stop_monitoring()

第三章:高效数据操作与查询优化

3.1 批量操作最佳实践

3.1.1 批量插入性能对比
import timeimport randomfrom pymongo import InsertOne, UpdateOne, DeleteOnedef performance_comparison(collection, num_documents=10000): \"\"\"对比单条插入和批量插入的性能差异\"\"\" # 生成测试数据 documents = [ { \"name\": f\"user_{i}\", \"age\": random.randint(18, 60), \"email\": f\"user_{i}@example.com\", \"created_at\": datetime.datetime.utcnow(), \"tags\": [f\"tag_{random.randint(1, 10)}\" for _ in range(3)] } for i in range(num_documents) ] # 测试单条插入 start_time = time.time() for doc in documents: collection.insert_one(doc) single_time = time.time() - start_time # 清空集合重新测试 collection.delete_many({}) # 测试批量插入 start_time = time.time() collection.insert_many(documents, ordered=False) # ordered=False 并行处理 batch_time = time.time() - start_time # 输出结果 print(f\"单条插入 {num_documents} 文档: {single_time:.2f} 秒\") print(f\"批量插入 {num_documents} 文档: {batch_time:.2f} 秒\") print(f\"性能提升: {(single_time / batch_time):.1f} 倍\") return single_time, batch_time# 执行性能测试single_time, batch_time = performance_comparison(db[\'performance_test\'])
3.1.2 批量写入操作
def bulk_operations_example(collection): \"\"\"批量操作示例\"\"\" # 准备批量操作列表 operations = [] # 1. 批量插入 for i in range(100): operations.append( InsertOne({ \"name\": f\"bulk_user_{i}\", \"age\": random.randint(20, 50), \"group\": \"A\" if i % 2 == 0 else \"B\" }) ) # 2. 批量更新 for i in range(50): operations.append( UpdateOne( {\"name\": f\"bulk_user_{i}\"}, {\"$set\": {\"status\": \"active\", \"updated_at\": datetime.datetime.utcnow()}} ) ) # 3. 批量删除 for i in range(10): operations.append( DeleteOne({\"name\": f\"bulk_user_{i}\"}) ) # 执行批量操作 if operations: try: result = collection.bulk_write(operations, ordered=False) print(\"批量操作结果:\") print(f\" 插入: {result.inserted_count}\") print(f\" 更新: {result.modified_count}\") print(f\" 删除: {result.deleted_count}\") print(f\" 匹配: {result.matched_count}\") print(f\" 上行插入: {result.upserted_count}\") return result  except Exception as e: print(f\"批量操作失败: {e}\") return None# 执行批量操作bulk_result = bulk_operations_example(db[\'bulk_test\'])

3.2 查询优化技巧

3.2.1 索引优化实战
def index_optimization_demo(collection): \"\"\"索引优化演示\"\"\" # 创建测试数据 test_data = [ { \"username\": f\"user_{i}\", \"email\": f\"user_{i}@example.com\", \"age\": random.randint(18, 65), \"city\": random.choice([\"北京\", \"上海\", \"广州\", \"深圳\", \"杭州\"]), \"created_date\": datetime.datetime.utcnow() - datetime.timedelta(days=random.randint(0, 365)), \"salary\": random.randint(3000, 30000), \"department\": random.choice([\"技术\", \"销售\", \"市场\", \"人事\", \"财务\"]) } for i in range(10000) ] collection.insert_many(test_data) # 1. 创建单字段索引 collection.create_index(\"username\", unique=True, name=\"idx_username_unique\") collection.create_index(\"email\", unique=True, name=\"idx_email_unique\") collection.create_index(\"age\", name=\"idx_age\") collection.create_index(\"city\", name=\"idx_city\") # 2. 创建复合索引 collection.create_index([ (\"department\", 1), (\"salary\", -1) ], name=\"idx_department_salary\") collection.create_index([ (\"city\", 1), (\"created_date\", -1) ], name=\"idx_city_created_date\") # 3. 创建多键索引(数组字段) # 假设有tags数组字段 collection.create_index(\"tags\", name=\"idx_tags\") # 4. 查看索引信息 indexes = list(collection.list_indexes()) print(\"当前索引列表:\") for idx in indexes: print(f\" - {idx[\'name\']}: {idx.get(\'key\', \'N/A\')}\") # 5. 分析查询性能 def analyze_query(query, projection=None, sort=None): \"\"\"分析查询性能\"\"\" print(f\"\\n分析查询: {query}\") # 不使用索引的查询计划 explain_no_hint = collection.find(query, projection).sort(sort).explain() winning_plan_no_hint = explain_no_hint[\'queryPlanner\'][\'winningPlan\'] # 使用特定索引的查询计划 if sort: sort_fields = [k for k, v in sort] index_name = f\"idx_{\'_\'.join(sort_fields)}\" explain_with_hint = collection.find(query, projection).sort(sort).hint(index_name).explain() winning_plan_with_hint = explain_with_hint[\'queryPlanner\'][\'winningPlan\'] else: winning_plan_with_hint = {\"stage\": \"NO_HINT_APPLIED\"} print(f\" 默认执行计划: {winning_plan_no_hint.get(\'stage\', \'UNKNOWN\')}\") print(f\" 索引提示执行计划: {winning_plan_with_hint.get(\'stage\', \'UNKNOWN\')}\") # 检查是否使用了索引 if winning_plan_no_hint.get(\'inputStage\', {}).get(\'stage\') == \'IXSCAN\': print(\" ✓ 查询使用了索引\") else: print(\" ✗ 查询未使用索引,可能需要进行优化\") # 测试不同查询 analyze_query({\"city\": \"北京\", \"age\": {\"$gt\": 30}}) analyze_query({\"department\": \"技术\"}, sort=[(\"salary\", -1)]) analyze_query({\"city\": \"上海\", \"created_date\": {\"$gt\": datetime.datetime.utcnow() - datetime.timedelta(days=30)}})# 执行索引优化演示index_optimization_demo(db[\'employee\'])
3.2.2 查询性能分析工具
class QueryAnalyzer: def __init__(self, collection): self.collection = collection def analyze_performance(self, query, projection=None, sort=None, hint=None, sample_size=1000, max_time_ms=5000): \"\"\" 全面分析查询性能 \"\"\" # 构建查询选项 find_args = {\'filter\': query} if projection: find_args[\'projection\'] = projection if sort: find_args[\'sort\'] = sort if hint: find_args[\'hint\'] = hint # 执行解释操作 explain_args = {\'verbosity\': \'executionStats\'} explanation = self.collection.find(**find_args).limit(sample_size).explain(**explain_args) # 提取关键指标 execution_stats = explanation.get(\'executionStats\', {}) query_planner = explanation.get(\'queryPlanner\', {}) # 分析结果 analysis = { \'query\': query, \'execution_time_ms\': execution_stats.get(\'executionTimeMillis\', 0), \'n_returned\': execution_stats.get(\'nReturned\', 0), \'n_examined\': execution_stats.get(\'totalDocsExamined\', 0), \'index_used\': query_planner.get(\'winningPlan\', {}).get(\'inputStage\', {}).get(\'stage\') == \'IXSCAN\', \'stage\': query_planner.get(\'winningPlan\', {}).get(\'stage\', \'UNKNOWN\'), \'scan_efficiency\': self._calculate_efficiency( execution_stats.get(\'nReturned\', 0), execution_stats.get(\'totalDocsExamined\', 0) ) } return analysis def _calculate_efficiency(self, returned, examined): \"\"\"计算查询效率\"\"\" if examined == 0: return 1.0 return returned / examined def generate_report(self, analyses): \"\"\"生成性能分析报告\"\"\" print(\"=\" * 80) print(\"查询性能分析报告\") print(\"=\" * 80) for i, analysis in enumerate(analyses, 1): print(f\"\\n查询 #{i}:\") print(f\" 查询条件: {analysis[\'query\']}\") print(f\" 执行时间: {analysis[\'execution_time_ms\']}ms\") print(f\" 返回文档: {analysis[\'n_returned\']}\") print(f\" 扫描文档: {analysis[\'n_examined\']}\") print(f\" 扫描效率: {analysis[\'scan_efficiency\']:.2%}\") print(f\" 使用索引: {\'是\' if analysis[\'index_used\'] else \'否\'}\") print(f\" 执行阶段: {analysis[\'stage\']}\") # 提供优化建议 if not analysis[\'index_used\']: print(\" 💡 优化建议: 考虑添加合适的索引\") elif analysis[\'scan_efficiency\'] < 0.1: print(\" 💡 优化建议: 索引选择性可能不足,考虑优化查询条件或索引\") elif analysis[\'execution_time_ms\'] > 100: print(\" 💡 优化建议: 查询较慢,考虑添加覆盖索引或优化数据模型\")# 使用查询分析器analyzer = QueryAnalyzer(db[\'employee\'])# 分析多个查询queries_to_analyze = [ {\"city\": \"北京\", \"age\": {\"$gt\": 30}}, {\"department\": \"技术\", \"salary\": {\"$gt\": 10000}}, {\"created_date\": {\"$gt\": datetime.datetime(2023, 1, 1)}}]analyses = []for query in queries_to_analyze: analysis = analyzer.analyze_performance(query) analyses.append(analysis)analyzer.generate_report(analyses)

3.3 聚合管道高级技巧

3.3.1 复杂聚合操作
def advanced_aggregation_examples(collection): \"\"\"高级聚合管道示例\"\"\" # 示例1: 多阶段聚合 - 用户行为分析 pipeline_1 = [ # 阶段1: 匹配条件 {\"$match\": { \"status\": \"active\", \"created_date\": {\"$gte\": datetime.datetime(2023, 1, 1)} }}, # 阶段2: 按城市分组并计算统计 {\"$group\": { \"_id\": \"$city\", \"total_users\": {\"$sum\": 1}, \"avg_age\": {\"$avg\": \"$age\"}, \"max_salary\": {\"$max\": \"$salary\"}, \"min_salary\": {\"$min\": \"$salary\"}, \"user_list\": {\"$push\": \"$username\"} }}, # 阶段3: 投影重命名字段 {\"$project\": { \"city\": \"$_id\", \"user_count\": \"$total_users\", \"average_age\": {\"$round\": [\"$avg_age\", 1]}, \"salary_range\": { \"max\": \"$max_salary\", \"min\": \"$min_salary\", \"difference\": {\"$subtract\": [\"$max_salary\", \"$min_salary\"]} }, \"sample_users\": {\"$slice\": [\"$user_list\", 5]} # 取前5个用户作为样本 }}, # 阶段4: 按用户数排序 {\"$sort\": {\"user_count\": -1}}, # 阶段5: 限制输出数量 {\"$limit\": 10} ] # 示例2: 时间序列分析 - 按日期统计 pipeline_2 = [ {\"$match\": {\"status\": \"active\"}}, # 按年月分组 {\"$group\": { \"_id\": { \"year\": {\"$year\": \"$created_date\"}, \"month\": {\"$month\": \"$created_date\"} }, \"count\": {\"$sum\": 1}, \"total_salary\": {\"$sum\": \"$salary\"} }}, # 计算平均薪资 {\"$project\": { \"year_month\": \"$_id\", \"user_count\": \"$count\", \"average_salary\": {\"$round\": [{\"$divide\": [\"$total_salary\", \"$count\"]}, 2]} }}, {\"$sort\": {\"year_month.year\": 1, \"year_month.month\": 1}} ] # 示例3: 条件聚合和桶操作 pipeline_3 = [ {\"$match\": {\"age\": {\"$gte\": 18}}}, # 按年龄分段 {\"$bucket\": { \"groupBy\": \"$age\", \"boundaries\": [18, 25, 35, 45, 55, 65], \"default\": \"65+\", \"output\": { \"count\": {\"$sum\": 1}, \"avg_salary\": {\"$avg\": \"$salary\"}, \"cities\": {\"$addToSet\": \"$city\"} } }}, # 计算各年龄段的薪资水平 {\"$project\": { \"age_group\": { \"$switch\": {  \"branches\": [ {\"case\": {\"$eq\": [\"$_id\", 18]}, \"then\": \"18-24\"}, {\"case\": {\"$eq\": [\"$_id\", 25]}, \"then\": \"25-34\"}, {\"case\": {\"$eq\": [\"$_id\", 35]}, \"then\": \"35-44\"}, {\"case\": {\"$eq\": [\"$_id\", 45]}, \"then\": \"45-54\"}, {\"case\": {\"$eq\": [\"$_id\", 55]}, \"then\": \"55-64\"}  ],  \"default\": \"65+\" } }, \"user_count\": \"$count\", \"average_salary\": {\"$round\": [\"$avg_salary\", 2]}, \"city_count\": {\"$size\": \"$cities\"} }}, {\"$sort\": {\"_id\": 1}} ] # 执行聚合查询 print(\"聚合结果1 - 城市用户统计:\") result1 = list(collection.aggregate(pipeline_1)) for item in result1: print(f\" {item[\'city\']}: {item[\'user_count\']} users, avg age: {item[\'average_age\']}\") print(\"\\n聚合结果2 - 时间序列统计:\") result2 = list(collection.aggregate(pipeline_2)) for item in result2: ym = item[\'year_month\'] print(f\" {ym[\'year\']}-{ym[\'month\']:02d}: {item[\'user_count\']} users, avg salary: {item[\'average_salary\']}\") print(\"\\n聚合结果3 - 年龄分组统计:\") result3 = list(collection.aggregate(pipeline_3)) for item in result3: print(f\" {item[\'age_group\']}: {item[\'user_count\']} users, avg salary: {item[\'average_salary\']}\")# 执行高级聚合advanced_aggregation_examples(db[\'employee\'])
3.3.2 聚合管道优化技巧
def optimized_aggregation(collection): \"\"\"聚合管道优化技巧\"\"\" # 技巧1: 尽早使用 $match 过滤数据 pipeline_optimized = [ {\"$match\": { # 尽早过滤,减少后续处理的数据量 \"status\": \"active\", \"salary\": {\"$gt\": 5000}, \"department\": {\"$in\": [\"技术\", \"销售\", \"市场\"]} }}, # 技巧2: 使用 $project 减少字段 {\"$project\": { \"name\": 1, \"department\": 1, \"salary\": 1, \"years_of_service\": { # 计算字段 \"$divide\": [  {\"$subtract\": [datetime.datetime.utcnow(), \"$created_date\"]},  1000 * 60 * 60 * 24 * 365 # 毫秒转年 ] } }}, # 技巧3: 使用 $addFields 而不是多个 $project {\"$addFields\": { \"salary_grade\": { \"$switch\": {  \"branches\": [ {\"case\": {\"$lt\": [\"$salary\", 8000]}, \"then\": \"初级\"}, {\"case\": {\"$lt\": [\"$salary\", 15000]}, \"then\": \"中级\"}, {\"case\": {\"$lt\": [\"$salary\", 25000]}, \"then\": \"高级\"}  ],  \"default\": \"资深\" } }, \"bonus_eligible\": {\"$gte\": [\"$years_of_service\", 2]} }}, # 技巧4: 使用 $facet 进行多维度分析(并行执行) {\"$facet\": { \"by_department\": [ {\"$group\": {  \"_id\": \"$department\",  \"avg_salary\": {\"$avg\": \"$salary\"},  \"count\": {\"$sum\": 1} }} ], \"by_grade\": [ {\"$group\": {  \"_id\": \"$salary_grade\",  \"avg_years\": {\"$avg\": \"$years_of_service\"},  \"count\": {\"$sum\": 1} }} ], \"summary\": [ {\"$group\": {  \"_id\": None,  \"total_employees\": {\"$sum\": 1},  \"total_salary\": {\"$sum\": \"$salary\"},  \"departments\": {\"$addToSet\": \"$department\"} }} ] }}, # 技巧5: 最后进行排序和限制 {\"$unwind\": \"$by_department\"}, {\"$sort\": {\"by_department.avg_salary\": -1}}, {\"$limit\": 10} ] # 执行优化后的聚合 try: result = list(collection.aggregate(pipeline_optimized)) print(\"优化聚合结果:\") print(f\"返回数据量: {len(result)}\") # 分析执行计划 explain_result = collection.aggregate(pipeline_optimized, explain=True) print(\"聚合执行计划分析完成\") return result except Exception as e: print(f\"聚合执行失败: {e}\") return None# 执行优化聚合optimized_result = optimized_aggregation(db[\'employee\'])

由于篇幅限制,本文先展示前三个章节的内容。后续章节将涵盖:
4. 事务处理与数据一致性
5. 变更流与实时数据处理
6. 安全性与权限管理
7. 性能监控与故障排查
8. 分布式架构与分片策略
9. 最佳实践与常见陷阱
10. 实战案例与性能测试
每个章节都会包含详细的代码示例、最佳实践和性能优化建议,帮助您全面掌握 PyMongo 的高效使用技巧。