企业级架构师综合能力项目案例二(项目性能优化方案JVM+数据库+缓存+代码JUC+消息中间件架构+服务熔断降级)
-
确立基线: 使用压测工具(JMeter)对当前系统压测,得到QPS(每秒处理的请求数)、RT(响应时间Response Time)、错误率等基线数据。
系统在保证可接受性能(如 RT < 200ms, 错误率 < 0.01%)的前提下,最大能支撑多少 QPS?
系统在特定压力(如日常峰值的 1.5 倍)下,持续运行 12/24 小时,是否会出现内存泄漏、RT 缓慢增长、错误率上升等问题?
找到系统的性能瓶颈所在(CPU、内存、磁盘 I/O、网络 I/O、数据库、外部依赖等) -
监控分析:
使用APM和jstat等工具定位瓶颈(是CPU、内存、IO还是数据库?)。
-
分层优化:
JVM层: 调整堆大小、选择合适的GC器、分析GC日志。 数据库层: 分析慢SQL、优化索引、调整连接池、考虑批量操作。ES优化、MongoDB优化 缓存层: 根据场景引入本地或分布式缓存,注意缓存穿透、击穿、雪崩问题。 代码层: review代码,避免内存泄漏、使用高效的数据结构和算法。 架构层: 考虑读写分离、分库分表、异步化(MQ)、服务熔断降级等。
-
验证效果: 再次压测,对比优化前后的指标,确认优化是否有效。无效则回到第2步。
JVM调优四步法
java性能优化
核心在于监控->分析->假设->验证的闭环。企业中90%的JVM问题通过分析GC日志和堆转储文件都能找到根源,剩下的10%则需要结合代码、中间件和系统架构进行更深层次的优化。
一、监控与诊断
①、开启GC日志 (强制必须)
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintGCCause -Xloggc:/path/to/your/gc-%t.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=10M
使用 GCeasy (在线分析工具)、GCE Viewer 等工具上传GC日志文件,它们会自动生成可视化报告,指出问题所在。
- YoungGC频率/耗时: 是否过于频繁?平均耗时是否正常(几十毫秒内)?
- FullGC频率/耗时: 绝对重点! 是否有FullGC?FullGC次数越多、耗时越长(秒级),问题越严重。FullGC是STW(Stop-The-World)的,会直接导致服务暂停、RT飙升。
- 堆内存使用率: 各区域(Eden, Survivor, Old Gen)的使用情况是否合理?
②、使用APM工具进行代码级诊断
Arthas (阿里开源,必备神器)、SkyWalking、Pinpoint工具,当发现某个接口RT很长,但不确定是JVM问题还是代码问题
Arthas常用命令:
- dashboard: 整体系统状态,实时查看堆内存、GC、线程。
- thread : 查看某个线程的栈,定位卡顿线程。
- trace : 追踪方法内部调用路径,并输出每个节点的耗时。这是定位“慢方法”的神器。
- jad : 反编译线上代码,确认最新版本已部署。
③、系统级监控
jstat (命令行利器): jstat -gcutil 1s (每秒钟查看一次GC和内存情况)
关注 YGC/YGCT(YoungGC次数/耗时),FGC/FGCT(FullGC次数/耗时),OUE(老年代使用率)。
二、常见问题模式与调优策略
三、参数调整与压测验证
- 编写调优脚本: 将调整的JVM参数写入应用的启动脚本(如 java -server -Xms4g -Xmx4g -XX:+UseG1GC
… -jar your-app.jar)。 - 重启应用: 部署到压测环境。
- 重新压测: 使用完全相同的JMeter脚本和压力策略进行压测。
- 收集数据: 再次收集GC日志、系统指标和性能指标(QPS, RT)。
四、对比分析与迭代
对比基线: 将新的GC日志分析报告和性能数据与第一步的基线进行对比。
- FullGC次数是否减少甚至消除?
- YoungGC耗时和频率是否可接受?
- P90/P99 RT是否下降?
- QPS是否提升?
得出结论:
- 如果有效: 保留参数,将其作为新的基线,并考虑是否进行下一轮优化(如进一步优化代码或数据库)。
- 如果无效甚至恶化: 回退参数,重新分析日志,尝试其他策略。
五、企业级项目实战案例
场景: 一个电商订单核心服务,在每晚高峰期,监控系统发现RT周期性飙高,同时伴随大量GC告警。
①、诊断 (第一步):
查看GC日志 (通过公司统一的日志平台): 使用GCeasy分析发现,每隔5-10分钟就有一次长达2-3秒的Full GC。
使用Arthas连接线上服务(在低峰期): 使用 thread 命令未发现大量阻塞线程。使用 trace 命令追踪订单生成方法,发现内部有一个深度的递归调用比较耗时,但并非主要矛盾。
使用jstat监控: jstat -gcutil 1s 发现老年代使用率在FullGC后从98%降到65%,但之后又缓慢上升,直到下次FullGC。
结论: 存在轻微的内存泄漏,或者生存期较长的缓存对象过多,导致老年代对象缓慢积累,最终触发FullGC。
②、策略与调整 (第二步):
第一步(紧急止血): 由于是G1GC,先尝试更早地启动混合回收(Mixed GC),让G1更积极一些。
将 -XX:InitiatingHeapOccupancyPercent 从45下调到35。
第二步(根因治理): 在凌晨低峰期,使用 jmap -dump 命令dump堆内存,下载到本地。
使用 Eclipse MAT (Memory Analyzer Tool) 分析堆转储文件。
点击 Leak Suspects Report(泄漏嫌疑报告)。
MAT显示有一个自定义的本地缓存 LocalCache 对象持有大量订单查询对象,且这个缓存没有大小限制和过期策略。
③、验证与迭代 (第三、四步):
代码修复: 开发团队修复代码,为 LocalCache 添加了LRU淘汰策略和TTL过期时间。
参数调整: 将 -XX:InitiatingHeapOccupancyPercent 调回45(因为代码修复后,老年代增长变慢)。
部署上线后重新监控: 下一个高峰期,FullGC现象消失,YoungGC频率正常,RT曲线变得平滑。调优成功。
六、推荐的标准G1GC参数模板
对于大多数企业级Java应用(8核CPU,16G内存级别),可以从这个模板开始,然后根据上述步骤微调:
# 必选:堆内存大小,根据机器内存设置,通常设成一样-Xms4g -Xmx4g# 必选:使用G1垃圾收集器 (JDK9+默认)-XX:+UseG1GC# 重要:GC日志记录,便于排查-XX:+PrintGCDetails-XX:+PrintGCDateStamps-XX:+PrintGCTimeStamps-XX:+PrintGCCause-Xloggc:/opt/your_app/logs/gc-%t.log-XX:+UseGCLogFileRotation-XX:NumberOfGCLogFiles=5-XX:GCLogFileSize=10M# 重要:Metaspace大小,避免溢出-XX:MaxMetaspaceSize=256m-XX:+UseCompressedClassPointers-XX:+UseCompressedOops# 可选:目标最大GC暂停时间,根据你的SLA要求设置(毫秒)-XX:MaxGCPauseMillis=200# 可选:并行GC线程数,一般无需设置,默认根据CPU核数计算#-XX:ParallelGCThreads=8# 可选:并发GC线程数,一般为ParallelGCThreads的1/4#-XX:ConcGCThreads=2# 重要:触发Mixed GC的堆占用率阈值-XX:InitiatingHeapOccupancyPercent=45# 重要:启用JVM对外内存泄漏检查(如Netty等NIO框架常用)-XX:NativeMemoryTracking=detail
数据库层调优
①、SQL语句与索引优化 (占比约70%的问题)
- 开启慢查询日志 (Slow Query Log): 抓出执行缓慢的SQL语句。
- 使用 EXPLAIN 分析执行计划: 理解数据库如何执行某条SQL,这是优化的核心技能。
- 避免常见反模式:
SELECT *
在 WHERE 子句中对字段进行函数操作或运算
使用 %keyword% 前导通配符模糊查询
隐式类型转换 - 索引优化策略:
最左前缀匹配原则
避免冗余和重复索引
使用覆盖索引 (Covering Index) 减少回表
对区分度高的字段建索引
场景: 用户订单查询页面缓慢。
Ⅰ、定位慢SQL
首先在MySQL配置中开启慢查询日志(通常在 my.cnf 中)
slow_query_log = 1slow_query_log_file = /var/log/mysql/mysql-slow.loglong_query_time = 2 # 定义超过2秒的查询为“慢查询”log_queries_not_using_indexes = 1 # 记录未使用索引的查询
重启MySQL后,通过工具(如 mysqldumpslow, pt-query-digest)分析慢日志文件。
Ⅱ、分析执行计划(EXPLAIN)
假设分析后找到一条慢SQL:
SELECT * FROM orders WHERE user_id = 123 AND create_time > \'2023-01-01\' ORDER BY total_amount DESC LIMIT 10;
使用 EXPLAIN 分析:
EXPLAIN SELECT * FROM orders WHERE user_id = 123 AND create_time > \'2023-01-01\' ORDER BY total_amount DESC LIMIT 10;
可能的结果与问题:
- type: ALL (最坏的情况,全表扫描)
- key: NULL (没有使用索引)
- Extra: Using filesort (在磁盘上进行排序,非常耗时)
Ⅲ、优化和添加索引(以下两个方案)
问题在于 WHERE 和 ORDER BY 用的字段不同。根据最左前缀原则,一个索引无法同时高效过滤和排序。
方案一:(常用): 创建联合索引 (user_id, create_time)。这样能快速定位到某个用户在某段时间内的订单。但排序 total_amount 依然需要 filesort。
CREATE INDEX idx_userid_createtime ON orders(user_id, create_time);
方案二: 如果业务总是按金额排序,可以创建覆盖索引 (user_id, total_amount, create_time)。这样索引本身就可以完成过滤和排序,无需回表(如果SELECT的字段都在索引中)和 filesort。
-- 删除旧索引(如果需要)DROP INDEX idx_userid_createtime ON orders;-- 创建新索引CREATE INDEX idx_userid_amount_createtime ON orders(user_id, total_amount, create_time);
再次使用 EXPLAIN 验证,会发现:
- type: ref 或 range (索引查找)
- key: idx_userid_amount_createtime
- Extra: Using where; Using index (理想状态,使用了覆盖索引)
Ⅳ、改写SQL
如果查询必须用 SELECT *,但表中有 text/blob 等大字段,回表代价很大。应考虑是否需要所有字段,或者进行分页查询。
-- 分页查询,减少单次数据量SELECT id, order_sn, total_amount, status, create_time -- 只查询需要的字段FROM orders WHERE user_id = 123 AND create_time > \'2023-01-01\' ORDER BY total_amount DESC LIMIT 0, 10; -- 第一页
②、数据库架构优化
当单实例数据库无法满足性能需求时,必须考虑架构扩展。
措施与方案:
- 读写分离: 主库负责写操作,多个从库负责读操作。适用于读多写少的场景。
- 分库分表: 分为垂直分库(按业务模块拆分)和水平分表(将一个大表的数据分到多个物理表中)。
- 引入缓存: 使用Redis等缓存热点数据,减轻数据库压力。
场景: orders 表数据量已达亿级,查询和插入都非常缓慢。
方案: 对 orders 表进行水平分表,按 user_id 取模分到4个表中。
<dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>5.3.2</version></dependency>
Spring Boot 配置 (application.yml):
spring: shardingsphere: datasource: names: ds0 # 这里先用单个数据源演示分表 ds0: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://localhost:3306/order_db?useUnicode=true username: root password: root rules: sharding: tables: orders: # 逻辑表名 actual-data-nodes: ds0.orders_$->{0..3} # 映射的实际物理表 key-generator: # 分布式主键生成策略 column: id type: SNOWFLAKE database-strategy: # 分库策略,这里未配置 table-strategy: # 分表策略 standard: sharding-column: user_id sharding-algorithm-name: orders_table_mod sharding-algorithms: orders_table_mod: type: MOD props: sharding-count: 4 # 分4张表 props: sql-show: true # 开发环境开启,显示实际路由的SQL
应用程序代码:
无需修改! 这是分库分中间件的最大优势。你仍然像操作单表一样编写SQL。
@Mapperpublic interface OrderMapper { // ShardingSphere会根据 user_id = 123 的值,计算并路由到具体的物理表 // 例如 123 % 4 = 3,这条记录会插入到 orders_3 表 @Insert(\"INSERT INTO orders (user_id, amount, ...) VALUES (#{userId}, #{amount}, ...)\") int insert(Order order); // 查询时同样,根据传入的 userId 路由到正确的表 @Select(\"SELECT * FROM orders WHERE user_id = #{userId} AND order_sn = #{orderSn}\") Order selectOneByUserIdAndSn(@Param(\"userId\") Long userId, @Param(\"orderSn\") String orderSn);}
注意: 对于不带 user_id 的查询(如 SELECT * FROM orders WHERE order_sn = ‘ABC’),ShardingSphere会执行广播查询(查询所有分表),效率低下。因此,分表后查询条件应尽量包含分片键。
③、连接池与应用层优化
- 使用高性能连接池: 如 HikariCP,并正确配置参数(maximum-pool-size 不是越大越好!通常等于 (核心数 * 2)
- 磁盘数)。
- 批处理 (Batch Processing): 对于大批量插入/更新操作。
- 避免 N+1 查询问题: 使用MyBatis等框架的联合查询或批量查询功能。
@Service@RequiredArgsConstructorpublic class OrderService { private final OrderMapper orderMapper; private final SqlSessionFactory sqlSessionFactory; // 低效:循环单条插入 public void createOrdersBad(List<Order> orders) { for (Order order : orders) { orderMapper.insert(order); // 每次插入都是一次网络IO+数据库事务 } } // 高效:使用批处理 public void createOrdersBatch(List<Order> orders) { try (SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH)) { OrderMapper mapper = session.getMapper(OrderMapper.class); for (int i = 0; i < orders.size(); i++) { mapper.insert(orders.get(i)); // 每500条提交一次,避免批量过大 if (i % 500 == 0 && i > 0) { session.commit(); session.clearCache(); // 清空缓存,防止OOM } } session.commit(); // 提交剩余的数据 } // try-with-resources 自动关闭session }}
④、数据库服务本身优化
- 参数调优: 调整
innodb_buffer_pool_size(通常设置为机器物理内存的50%-70%),innodb_log_file_size 等。 - 硬件升级: 使用SSD硬盘,增加内存容量。
- 操作系统优化: 调整文件系统的挂载参数、内核参数等。
总结: 数据库调优是一个自上而下、由表及里的过程。优先从SQL和索引入手,这能解决大部分问题。当单实例性能达到极限时,再考虑读写分离、分库分表等架构方案。同时,配合应用层的批处理和缓存,以及数据库本身的参数调优,才能构建一个高性能、高可用的企业级数据存储层。
Elasticsearch (ES) 优化
ES优化的核心目标是:提升查询速度、降低写入延迟、保证集群稳定性和资源利用率
①、合理设置分片 (Shards) 和副本 (Replicas)
分片数: 一旦创建索引,主分片数不可更改。设置需谨慎。
- 过大弊端: 每个分片都有开销(CPU、内存)。分片过多会导致查询性能下降(需要合并更多分片的结果)、集群恢复变慢。
- 过小弊端: 无法水平扩展,单个分片过大影响性能且迁移困难。
- 黄金法则: 建议单个分片大小控制在 10GB - 50GB 之间。可以通过 总数据量(1+副本数) / 50GB
来预估初始分片数。对于时序数据,通常按天/周创建索引,分片数可固定为一个较小值(如3-5)。
副本数: 提供高可用和提升读取吞吐量。可以在创建索引后动态调整。
- 生产环境至少设置为 1。在写入压力大时,可以临时设置为 0,写入完成后再恢复,最后用 _forcemerge 合并段。
案例:创建一个商品索引,预计有1TB数据,设置1个副本
PUT /products{ \"settings\": { \"number_of_shards\": 15, // (1000GB * 2) / 50GB ≈ 40, 但40可能过多。可先试15,预留未来2-3倍增长。 \"number_of_replicas\": 1, \"refresh_interval\": \"30s\" // 写入优化:降低刷新频率,默认1s }, \"mappings\": { ... }}
②、映射 (Mapping) 与查询优化
- 避免动态映射,明确定义字段类型:防止ES自动推断出不合适的类型(如将数字推断为text)。
- 慎用 keyword 和 text:
keyword:用于精确匹配、聚合、排序。无需分词。
text:用于全文检索。会被分词,占用资源更多。 - 禁用不需要索引的字段:对于仅用于存储、从不用于查询的字段,设置 “index”: false,大幅节省磁盘和内存。
- 使用 runtime_fields:对于查询时才计算的字段,避免索引开销。
案例:一个商品映射优化
PUT /products/_mapping{ \"properties\": { \"product_id\": { \"type\": \"keyword\" // 精确查找、Term聚合 }, \"product_name\": { \"type\": \"text\", // 全文搜索 \"fields\": { \"keyword\": { \"type\": \"keyword\" // 同时提供一个用于精确聚合的字段 } } }, \"price\": { \"type\": \"scaled_float\", // 优于float,存储更高效 \"scaling_factor\": 100 }, \"description\": { \"type\": \"text\", \"index\": false // 商品描述很长,只存储,从不参与搜索 }, \"create_time\": { \"type\": \"date\" } }}
③、写入优化
- 使用批量请求 (Bulk API):单条写入开销极大,必须批量。
- 调整刷新间隔 (refresh_interval):写入阶段临时设置为 -1(关闭)或 “30s”,减少Segment生成和合并的压力。
- 调整事务日志 (translog) 策略:对于可容忍少量数据丢失的场景,设置 “index.translog.durability”:
“async” 和 “index.translog.sync_interval”: “5s”,减少磁盘IO。
案例:大批量导入商品数据的最佳实践
# 1. 创建索引时禁用刷新和副本PUT /temp_products{ \"settings\": { \"number_of_shards\": 3, \"number_of_replicas\": 0, // 写入时先禁用副本 \"refresh_interval\": \"-1\" // 写入时先禁用刷新 }}# 2. 使用Bulk API进行大量写入curl -s -H \"Content-Type: application/x-ndjson\" -XPOST \"localhost:9200/temp_products/_bulk\" --data-binary \"@bulk_data.json\"# 3. 写入完成后,恢复设置PUT /temp_products/_settings{ \"number_of_replicas\": 1, \"refresh_interval\": \"1s\"}# 4. (可选) 强制合并段,优化查询性能POST /temp_products/_forcemerge?max_num_segments=1
④、查询优化
- 避免深度分页:from + size 方式超过 10000 会性能骤降。使用 search_after 或 scroll API。
- 使用过滤器上下文 (Filter Context):filter 子句会缓存结果,速度远快于 must(评分查询)。
- 限制返回字段:使用 _source 过滤,避免传输大量无用数据。
- 使用异步搜索 (Async Search):对于非常耗时的查询,避免阻塞。
案例:一个高效的商品搜索查询
GET /products/_search{ \"_source\": [\"product_id\", \"product_name\", \"price\"], // 只返回需要的字段 \"query\": { \"bool\": { \"must\": [ // 必须匹配,参与评分 { \"match\": { \"product_name\": \"手机\" } } ], \"filter\": [ // 过滤,不参与评分,结果可缓存 { \"range\": { \"price\": { \"gte\": 1000, \"lte\": 5000 } } }, { \"term\": { \"category\": \"electronics\" } } ] } }, \"sort\": [ // 按价格排序 { \"price\": { \"order\": \"asc\" } } ], \"from\": 0, \"size\": 20}
MongoDB 优化
MongoDB优化的核心是:正确使用索引、优化数据模型、高效使用硬件资源。
①、数据模型设计优化
嵌入式 vs 引用式
-
嵌入式: 适用于“一对一”或“一对少”且子文档不频繁独立查询的场景。优先选择,因为它能通过一次查询获取所有数据。
案例:用户(User)文档中嵌入地址(Address)文档数组。 -
引用式: 适用于“一对多”或“多对多”且子文档经常被独立查询或更新的场景。
案例:文章(Post) 和 评论(Comment) 用 ObjectId 关联。
避免使用大型数组:数组增长会导致文档移动,影响性能。如果数组可能无限增长,应将其建模为单独的集合。
②、索引优化 (与RDBMS思路类似但更灵活)
- 复合索引顺序:遵循 ESR原则:精确匹配(Equal)字段 -> 排序(Sort)字段 -> 范围(Range)字段。
- 创建适合查询模式的索引:通过 db.collection.explain().find(…)
分析查询计划,确认是否使用索引(IXSCAN)而非全表扫描(COLLSCAN)。 - 覆盖查询 (Covered Query):如果查询只需返回索引中包含的字段,可以无需回表,极快。
- 使用部分索引 (Partial Index):只为满足条件的文档创建索引,节省空间。
- TTL索引:自动过期删除数据,非常适合日志、会话等场景。
案例:优化一个订单查询
// 查询:查找某个用户状态为“已完成”的订单,按创建时间倒序排列db.orders.find({ user_id: 12345, status: \"completed\"}).sort({ create_time: -1 })// 创建复合索引:Equal -> Sort -> Range (这里没有Range,所以ES)db.orders.createIndex({ \"user_id\": 1, // Equal first \"create_time\": -1 // Sort next (注意方向:1为升序,-1为降序,与sort一致最佳)})// 更优的索引:覆盖查询// 假设我们只需要返回 order_id 和 create_timedb.orders.createIndex( { \"user_id\": 1, \"status\": 1, \"create_time\": -1 }, { \"partialFilterExpression\": { \"status\": \"completed\" } } // 部分索引,只索引已完成订单)// 查询改为只投影需要的字段db.orders.find({ user_id: 12345, status: \"completed\"}, { order_id: 1, create_time: 1, _id: 0 }) // _id:0 排除默认返回的_id).sort({ create_time: -1 })// 这个查询可以被我们创建的索引完全覆盖,性能极佳。
③、写入优化
- 有序 vs 无序写入:ordered: false 的批量插入更快,因为可以并行执行,但无法保证顺序。
- 调整写关注 (Write Concern):根据业务对数据安全性的要求调整。
w: 1 (默认):写主节点确认。性能好。
w: majority:写大多数节点确认。数据安全,但延迟高。 - 批量插入:使用 insertMany() 而非循环 insertOne()。
案例:批量插入日志数据(可容忍少量丢失)
db.app_log.insertMany( [ ... ], // 巨大的日志文档数组 { ordered: false, // 无序写入,提升速度 writeConcern: { w: 0 } // 不等待确认,速度最快,但可靠性最低 })
④、分片集群优化 (应对海量数据)
选择合适的分片键 (Shard Key):这是最重要的决策。
- 要求: 基数大、频率高、写分布均匀。避免单调递增的分片键(如时间戳、自增ID),会导致“热分片”和写瓶颈。
- 策略: 使用复合分片键(如 {customer_id: 1, timestamp: -1})或基于哈希的分片({_id:
“hashed”})。
预分裂 (Pre-Splitting):在大规模导入数据前,手动预先分割分片,避免集群自动平衡带来的开销。
缓存层优化(本地缓存+分布式缓存)
企业级项目通常采用多级缓存架构,最大化减少对分布式缓存的访问延迟和压力。
L1: 本地缓存 (Local Cache):
- 特点: 速度极快(内存访问),与应用进程共生,无网络开销。
- 缺点: 容量有限,数据不一致(不同节点间缓存可能不同)。
- 适用场景: 极热点的、数据量小、更新不频繁的数据(如字典数据、配置信息)。
技术选型: Caffeine (高性能,推荐), Guava Cache, Ehcache。
L2: 分布式缓存 (Distributed Cache):
- 特点: 容量大,可扩展,所有应用节点共享同一份视图,保证数据一致性。
- 缺点: 有网络开销,速度慢于本地缓存。
- 适用场景: 共享的、大量的热点数据。
技术选型: Redis (最主流), Memcached。
应用 -> 本地缓存 -> (未命中) -> 分布式缓存 -> (未命中) -> 数据库
缓存模式 (Cache Pattern)
- Cache-Aside (旁路缓存): 最常用模式。应用代码直接负责读写缓存和数据库。
- Read/Write-Through: 缓存提供商负责同步读写缓存和数据库,对应用透明。实现复杂,较少用。
- Write-Behind: 异步写入数据库,性能最好,但有数据丢失风险。
以最经典的 Cache-Aside 模式为例,结合 Spring Boot + Caffeine + Redis 进行说明
①、缓存穿透
查询一个根本不存在的数据。请求会穿过缓存,直接访问数据库。如果有人恶意攻击,大量请求不存在的key,会导致数据库压力巨大。
- 缓存空对象 (Cache Null): 即使从DB没查到,也将一个空值(或特殊标记)写入缓存,并设置一个较短的过期时间(如30秒)
- 布隆过滤器 (Bloom Filter):在缓存之前加一层布隆过滤器。它能够以极小的空间代价判断一个key是否一定不存在于数据库中。对于不存在的key,直接拦截返回。
@Service@RequiredArgsConstructorpublic class ProductServcie{private final ProductMapper productMapper;private final RedisTemplate<String,Object> redisTemplate;private final CacheManager cacheManager;private static final String PRODUCT_CACHE = \"products\";private static final String NULL_PLACEHOLDER = \"NULL\";//空值标记public Product getProductById(){// 1. 构造缓存Key String cacheKey = \"product:\" + id; // 2. 先从缓存查询 (这里直接使用RedisTemplate演示,也可用@Cacheable) Object value = redisTemplate.opsForValue().get(cacheKey); // 3. 如果缓存命中,且不是空标记,则直接返回 if (value != null) { if (NULL_PLACEHOLDER.equals(value)) { // 命中的是空值,防止穿透,直接返回null,无需查库 return null; } return (Product) value; } // 4. 缓存未命中,查询数据库 Product product = productMapper.selectById(id); // 5. 数据库也不存在,缓存空对象(设置短过期时间,如30秒) if (product == null) { redisTemplate.opsForValue().set(cacheKey, NULL_PLACEHOLDER, 30, TimeUnit.SECONDS); return null; } // 6. 数据库存在,写入缓存(设置正常过期时间,如1小时) redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); return product;}}
②、缓存击穿
某个热点key在缓存过期的瞬间,有大量并发请求进来,导致所有请求都落到数据库上。
- 互斥锁 (Mutex Lock): 只允许一个请求去重建缓存,其他请求等待或返回旧数据。
- 逻辑过期 (Logical Expiration):
不给缓存设置物理TTL,而是定义一个逻辑过期字段。当发现逻辑过期时,另起线程去异步更新缓存,当前请求返回旧数据。
public Product getProductByIdWithLock(Long id) { String cacheKey = \"product:\" + id; String lockKey = \"lock:product:\" + id; // 锁的Key Product product; // 1. 尝试从缓存获取 product = (Product) redisTemplate.opsForValue().get(cacheKey); if (product != null) { return product; } // 2. 缓存未命中,尝试获取分布式锁 // SET lockKey currentTime NX PX 3000 -> 原子操作:不存在时设置,并过期时间3秒 String token = UUID.randomUUID().toString(); Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockKey, token, 3, TimeUnit.SECONDS); try { if (Boolean.TRUE.equals(isLock)) { // 3. 获取锁成功,再次检查缓存(Double Check),因为可能前面的请求已经重建好了 product = (Product) redisTemplate.opsForValue().get(cacheKey); if (product != null) { return product; } // 4. 查询数据库 product = productMapper.selectById(id); if (product == null) { // 防止穿透 redisTemplate.opsForValue().set(cacheKey, NULL_PLACEHOLDER, 30, TimeUnit.SECONDS); return null; } // 5. 写入缓存 redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); } else { // 6. 获取锁失败,说明有其他线程在重建缓存,休眠一下再重试(自旋) Thread.sleep(50); return getProductByIdWithLock(id); // 递归重试 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(\"Interrupted while acquiring lock\", e); } finally { // 7. 释放锁:使用Lua脚本保证原子性,判断token是否还是自己的再删除 // 避免误删其他线程的锁 String luaScript = \"if redis.call(\'get\', KEYS[1]) == ARGV[1] then return redis.call(\'del\', KEYS[1]) else return 0 end\"; redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), Collections.singletonList(lockKey), token); } return product;}
③、缓存雪崩
大量缓存key在同一时间点(或时间段)大面积失效,导致所有请求都落到数据库上,造成数据库瞬时压力过大。
- 差异化过期时间: 给缓存设置过期时间时,增加一个随机值,避免同时失效。TTL = baseTime + random.nextInt(0, 300) // 例如基础1小时 + 随机5分钟内
- 缓存永不过期,后台更新: 缓存不设TTL,而是由后台任务或定时任务定期异步更新缓存。
- 构建高可用缓存集群: 如Redis Cluster,防止单点故障导致整个缓存层不可用。
- 服务熔断与降级: 使用Hystrix或Sentinel等组件,当数据库压力过大时,对请求进行熔断,直接返回降级信息(如“服务繁忙,请稍后再试”)。
// 基础过期时间long baseExpireTime = TimeUnit.HOURS.toSeconds(1);// 随机增加0-5分钟的随机时间long randomExpire = ThreadLocalRandom.current().nextLong(0, TimeUnit.MINUTES.toSeconds(5));long finalExpireTime = baseExpireTime + randomExpire;redisTemplate.opsForValue().set(cacheKey, product, finalExpireTime, TimeUnit.SECONDS);
本地缓存与分布式缓存协同 (Caffeine + Redis)
spring: cache: type: caffeine caffeine: spec: maximumSize=10000,expireAfterWrite=60s # 本地缓存:1万条,60秒过期 redis: time-to-live: 3600s # 分布式缓存:1小时过期(基础值,代码中会加随机) use-key-prefix: true cache-null-values: true # 允许缓存空值,解决穿透
@Configuration@EnableCachingpublic class CacheConfig { // Caffeine配置已通过yaml完成}@Servicepublic class ProductService { // 此方法会自动使用两级缓存:先查Caffeine,未命中再查Redis,再未命中才执行方法体查DB。 // 查到的结果会依次写回Redis和Caffeine。 @Cacheable(value = \"products\", key = \"#id\", unless = \"#result == null\") public Product getProductById(Long id) { // 防止击穿:在方法上加分布式锁的逻辑需要自己实现,注解无法自动完成。 // 或者使用逻辑过期方案,对注解返回的结果进行包装。 Product product = productMapper.selectById(id); // 如果查不到,因为配置了`cache-null-values: true`,Spring会缓存null值 return product; } @CachePut(value = \"products\", key = \"#product.id\") public Product updateProduct(Product product) { productMapper.updateById(product); return product; // 更新缓存 } @CacheEvict(value = \"products\", key = \"#id\") public void deleteProduct(Long id) { productMapper.deleteById(id); }}
缓存一致性
策略:
- 先更新数据库,再删除缓存 (推荐): updateDB(); invalidateCache();
这是最常用的模式,出现不一致的概率较低。即使第二步失败,也只是导致一次脏读,下次读取时会纠正。 - 通过 Canal 监听数据库Binlog,异步更新/删除缓存: 解耦应用和缓存更新逻辑,可靠性高,是大型互联网公司的首选方案。
监控与告警
- 监控指标: 缓存命中率、慢查询、内存使用率、网络流量、Key数量。
- 工具: Redis自带的 INFO 命令、RedisExporter + Prometheus + Grafana。
- 设置告警规则:当缓存命中率过低(如低于80%)或内存使用率过高(如超过80%)时触发告警。
总结:
- 架构上: 采用多级缓存(Caffeine + Redis)分担压力。
- 策略上: 严格使用 Cache-Aside 模式,并配套解决三大问题:
穿透: 缓存空对象 + 布隆过滤器。
击穿: 互斥锁(分布式锁)或逻辑过期。
雪崩: 差异化过期时间 + 高可用集群。
缓存工具类封装 - 实现上: 优先使用成熟的框架(Spring Cache)简化开发,但对核心热点数据访问路径要有能力进行手动精细控制(如自己实现锁逻辑)。
- 运维上: 完善的监控和告警是保证缓存层长期稳定运行的基石。
具体内容:缓存具体介绍
代码层的优化(异步+JUC等)
JUC并发包参考:具体JUC内容介绍
监控先行: 使用APM工具和Micrometer监控线程池状态、锁竞争情况、队列长度。
选择合适的工具:
- 计算任务: ForkJoinPool
- IO任务: 自定义 ThreadPoolExecutor
- 缓存: Caffeine / Guava Cache
- 映射表: ConcurrentHashMap
- 计数器: LongAdder
避免常见陷阱:
- 线程池: 使用有界队列和合理的拒绝策略。
- 锁: 尽量缩小锁范围,使用读写锁减少竞争。
- CompletableFuture: 注意异常处理和线程池选择。
性能测试: 使用JMeter等工具对优化前后的代码进行压测,用数据证明优化效果。
一、线程池优化
Java高并发(线程创建以及线程池,异步调用,CompletableFuture)详解
SpringBoot整合定时任务+定时任务启用异步线程池
- CPU密集型任务: 线程数 ≈ CPU核数 + 1
- IO密集型任务: 线程数 ≈ CPU核数 * (1 + 平均等待时间/平均计算时间)
①、使用合适的线程池
@Servicepublic class OrderProcessingService { // IO密集型任务(如调用外部API、数据库操作),假设CPU为8核 // 最佳线程数 = 8 * (1 + 等待时间/计算时间) ≈ 8 * (1 + 10) ≈ 80 private final ThreadPoolExecutor ioBoundExecutor = new ThreadPoolExecutor( 50, // 核心线程数:维持的常驻线程,避免频繁创建销毁 100, // 最大线程数:突发流量的应对能力 60L, TimeUnit.SECONDS, // 空闲线程存活时间 new LinkedBlockingQueue<>(1000), // 任务队列,需设置有界队列防止OOM new ThreadFactoryBuilder().setNameFormat(\"order-io-pool-%d\").build(), // 命名线程,便于监控 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用者线程执行,是一种降级 ); // CPU密集型任务(如计算、数据处理) private final ExecutorService cpuBoundExecutor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); public CompletableFuture<Order> processOrderAsync(Order order) { return CompletableFuture.supplyAsync(() -> { // 模拟IO操作 enrichOrderDetails(order); // 调用外部服务 calculateOrderPrice(order); // CPU计算 saveOrderToDB(order); // 数据库操作 return order; }, ioBoundExecutor); // 指定使用IO线程池 } // 优雅关闭,在应用关闭时调用 @PreDestroy public void shutdown() { ioBoundExecutor.shutdown(); cpuBoundExecutor.shutdown(); }}
②、监控线程池状态:
使用Micrometer监控线程池指标,便于调优。
@Configurationpublic class ThreadPoolMonitorConfig { @Autowired public void registerMetrics(ThreadPoolExecutor executor, MeterRegistry registry) { Gauge.builder(\"thread.pool.core.size\", executor, ThreadPoolExecutor::getCorePoolSize) .tags(\"poolName\", \"order-io-pool\") .register(registry); Gauge.builder(\"thread.pool.active.count\", executor, ThreadPoolExecutor::getActiveCount) .tags(\"poolName\", \"order-io-pool\") .register(registry); Gauge.builder(\"thread.pool.queue.size\", executor, e -> e.getQueue().size()) .tags(\"poolName\", \"order-io-pool\") .register(registry); }}
二、并发数据结构
使用线程安全的集合类替代 synchronized 包装类,性能提升显著。
①、高并发读、少量写的场景:使用 CopyOnWriteArrayList
@Servicepublic class ProductConfigService{//产品配置列表,读多写少(写只在每天凌晨刷新一次)private final CopyOnWriteArrayList<ProductConfig> configList = CopyOnWriteArrayList<>();//并发读:性能极高,无需加锁public ProductConfig getConfigById(String id){return configList.stream() .filter(config -> config.getId().equals(id)) .findFirst() .orElse(null);}// 写操作:加锁,复制整个数组,成本高。适合低频写。 @Scheduled(cron = \"0 0 2 * * ?\") // 每天凌晨2点刷新 public void refreshConfig() { List<ProductConfig> newConfigs = loadConfigsFromDB(); configList.clear(); configList.addAll(newConfigs); }}
②、高并发计数器:使用 LongAdder (替代 AtomicLong)
@Servicepublic class MetricsService { // 统计API调用次数。LongAdder在高并发下性能远优于AtomicLong。 private final LongAdder apiCallCounter = new LongAdder(); private final LongAdder errorCounter = new LongAdder(); public void recordApiCall() { apiCallCounter.increment(); } public void recordError() { errorCounter.increment(); } public long getApiCallCount() { return apiCallCounter.sum(); } public long getErrorCount() { return errorCounter.sum(); }}
③、高并发映射表:使用 ConcurrentHashMap
@Servicepublic class UserSessionManager { // 存储用户会话信息,key: userId, value: SessionInfo private final ConcurrentHashMap<Long, SessionInfo> sessionMap = new ConcurrentHashMap<>(); // 使用 computeIfAbsent 保证原子性的\"如果不存在则计算\" public SessionInfo getSession(Long userId) { return sessionMap.computeIfAbsent(userId, id -> { // 只有当key不存在时,这个函数才会被执行 SessionInfo newSession = createNewSession(id); return newSession; }); } // 使用 forEach 进行并发遍历(弱一致性) public void expireOldSessions(Duration duration) { long cutoffTime = System.currentTimeMillis() - duration.toMillis(); sessionMap.forEach((userId, session) -> { if (session.getLastAccessTime() < cutoffTime) { sessionMap.remove(userId, session); // 使用 remove(key, value) 避免误删 } }); }}
三、锁优化
减少锁竞争是提升并发性能的核心。
①、读写锁 (ReentrantReadWriteLock):读多写少的场景
@Servicepublic class ProductInventoryService { private final Map<Long, Integer> inventoryMap = new HashMap<>(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); // 多个线程可以同时读库存 public Integer getInventory(Long productId) { rwLock.readLock().lock(); try { return inventoryMap.getOrDefault(productId, 0); } finally { rwLock.readLock().unlock(); } } // 写库存时独占锁 public void updateInventory(Long productId, Integer quantity) { rwLock.writeLock().lock(); try { inventoryMap.put(productId, quantity); } finally { rwLock.writeLock().unlock(); } }}
②、分段锁(Striped Lock):优化热点资源竞争
@Servicepublic class StripedLockOrderService { // 按订单ID进行分段锁,减少锁竞争 private static final int STRIPE_COUNT = 16; private final Lock[] stripes = new ReentrantLock[STRIPE_COUNT]; public StripedLockOrderService() { for (int i = 0; i < STRIPE_COUNT; i++) { stripes[i] = new ReentrantLock(); } } private Lock getLock(Long orderId) { // 简单的哈希取模,将不同订单散列到不同的锁上 return stripes[(int) (orderId % STRIPE_COUNT)]; } public void processOrder(Long orderId) { Lock lock = getLock(orderId); lock.lock(); try { // 处理订单业务,同一订单串行,不同订单并行 doProcessOrder(orderId); } finally { lock.unlock(); } }}
③、自旋锁与 Atomic 字段更新器:极致性能场景
public class CompactCounter { private volatile long value; private static final AtomicLongFieldUpdater<CompactCounter> UPDATER = AtomicLongFieldUpdater.newUpdater(CompactCounter.class, \"value\"); public long increment() { return UPDATER.incrementAndGet(this); } // 比 AtomicLong 节省内存,性能相近}
四、异步编排与并发工具
①、CompletableFuture 异步编排:优化串行远程调用
@Servicepublic class UserDetailService { private final ExecutorService taskExecutor = ...; public CompletableFuture<UserProfile> getUserProfileAsync(Long userId) { // 并行调用多个外部服务,最后合并结果 CompletableFuture<UserInfo> userInfoFuture = CompletableFuture.supplyAsync( () -> userService.getUserInfo(userId), taskExecutor); CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync( () -> orderService.getUserOrders(userId), taskExecutor); CompletableFuture<Account> accountFuture = CompletableFuture.supplyAsync( () -> accountService.getAccount(userId), taskExecutor); // 等所有任务完成,然后组合结果 return CompletableFuture.allOf(userInfoFuture, ordersFuture, accountFuture) .thenApplyAsync(v -> { try { UserInfo userInfo = userInfoFuture.get(); List<Order> orders = ordersFuture.get(); Account account = accountFuture.get(); return new UserProfile(userInfo, orders, account); } catch (Exception e) { throw new CompletionException(e); } }, taskExecutor); }}
②、CountDownLatch / CyclicBarrier:并行任务栅栏
@Servicepublic class DataInitializationService { public void initAllData() throws InterruptedException { int taskCount = 3; CountDownLatch latch = new CountDownLatch(taskCount); ExecutorService executor = Executors.newFixedThreadPool(taskCount); executor.submit(() -> { initUserData(); latch.countDown(); }); executor.submit(() -> { initProductData(); latch.countDown(); }); executor.submit(() -> { initOrderData(); latch.countDown(); }); // 等待所有初始化任务完成,最多等10分钟 if (!latch.await(10, TimeUnit.MINUTES)) { throw new RuntimeException(\"Data initialization timeout\"); } executor.shutdown(); }}
③、Semaphore:控制资源并发访问数
@Servicepublic class RateLimitedHttpClient { // 限制对某个外部API的并发调用数不超过10个 private final Semaphore semaphore = new Semaphore(10); public String callExternalApi(String url) { if (!semaphore.tryAcquire(3, TimeUnit.SECONDS)) { throw new RuntimeException(\"API call limit exceeded\"); } try { return httpClient.get(url); } finally { semaphore.release(); } }}
架构层的优化(中间件)
异步化 (Asynchronization) - 消息队列 (MQ)
识别场景: 找出适合异步化的场景,如:
- 耗时操作: 发送短信/邮件、生成报表、上传大文件。
- 非核心业务: 用户行为日志记录、积分更新、消息推送。
- 流量削峰: 秒杀、抢购等场景,将请求先存入MQ,后端服务慢慢消费。
引入MQ依赖: 在项目中引入MQ客户端依赖。
配置MQ连接: 在配置文件中配置MQ的NameServer/Broker地址、生产者组、消费者组等。
编写生产者: 在需要发送消息的地方,注入MQ模板类,发送消息。
编写消费者: 创建监听器类,消费指定Topic的消息,并实现业务逻辑。
考虑事务消息: 对于需要保证业务操作和消息发送一致性的场景(如下单后发券),使用事务消息。
SpringBoot整合消息中间件
RocketMQ多种消费模式
①、添加依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version></dependency>
②、配置文件
rocketmq: name-server: 127.0.0.1:9876 # RocketMQ nameserver 地址 producer: group: my-producer-group # 生产者组名
③、生产者服务
@Servicepublic class OrderService { @Autowired private RocketMQTemplate rocketMQTemplate;public void createOrder(){//1.本地数据库事务,创建订单orderDao.insert(order);// 2. 订单创建成功后,发送一个异步消息(例如:发放优惠券) // 使用异步消息确保即使消息发送失败,也不会回滚订单事务(最终一致性) String topic = \"Topic-OrderCreated\"; String payload = \"OrderID:\" + order.getId();rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(payload).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info(\"消息发送成功:{}\", sendResult); } @Override public void onException(Throwable e) { log.error(\"消息发送失败,订单ID:{}\", order.getId(), e); // 可以在这里加入重试逻辑或记录补偿日志 } }); //3.立即返回结果给用户}}
④、消费者
@Service@RocketMQMessageListener(topic = \"Topic-OrderCreated\", consumerGroup = \"my-consumer-group-coupon\")public class OrderCreatedCouponListener implements RocketMQListener<String> { @Autowired private CouponService couponService; @Override public void onMessage(String message) { // 解析消息,获取订单ID String orderId = message.split(\":\")[1]; log.info(\"收到订单创建消息,开始为订单 {} 发放优惠券\", orderId); try { couponService.grantCouponByOrderId(orderId); } catch (Exception e) { log.error(\"发放优惠券失败,订单ID: {}\", orderId, e); // 注意:RocketMQ消费者默认会自动重试(重试16次后进入死信队列) // 根据业务决定是否需要抛出异常以触发重试 throw e; } }}
服务熔断与降级
防止分布式系统出现“雪崩效应”,当某个服务出现故障或延迟时,提供备用方案,保证核心链路的可用性。
企业级解决方案: Resilience4j, Sentinel (Alibaba), Hystrix (Netflix,已停维护,推荐前两者)。
优化步骤 (以 Resilience4j 为例):
①、添加依赖
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId></dependency>
②、配置文件application.yml
配置熔断器/降级规则: 在配置文件中为特定方法设置规则(失败率、滑动窗口大小、半开状态等待时间等)
resilience4j: circuitbreaker: instances: userServiceCB: # 熔断器实例名称 registerHealthIndicator: true slidingWindowSize: 10 # 滑动窗口大小 minimumNumberOfCalls: 5 # 最小调用次数,低于此数则不开启熔断计算 waitDurationInOpenState: 5s # 熔断开启后,等待多久进入半开状态 failureRateThreshold: 50 # 失败率阈值,超过50%则熔断 permittedNumberOfCallsInHalfOpenState: 3 # 半开状态下允许的调用次数
③、Service层注解
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Servicepublic class OrderService { @Autowired private UserServiceClient userServiceClient; // 一个Feign客户端,用于调用远程用户服务 /** * 获取用户订单详情 * 需要调用用户服务获取用户信息 */ @CircuitBreaker(name = \"userServiceCB\", fallbackMethod = \"getUserOrderDetailFallback\") // 可以组合多个注解,比如再加一个 @TimeLimiter 做超时控制 public UserOrderDetail getUserOrderDetail(String orderId, String userId) { // 1. 本地查询订单 Order order = orderDao.selectById(orderId); // 2. 【远程调用】- 可能失败或高延迟的点 User user = userServiceClient.getUserById(userId); // 3. 组装数据 return new UserOrderDetail(order, user); } /** * Fallback 方法 * 参数和返回值必须与原方法一致,最后可以多加一个 Throwable 参数用来接收异常 */ private UserOrderDetail getUserOrderDetailFallback(String orderId, String userId, Throwable t) { log.warn(\"调用用户服务失败,进入降级逻辑。订单ID: {}, 用户ID: {}\", orderId, userId, t); // 降级策略: // 1. 返回一个兜底数据 Order order = orderDao.selectById(orderId); User dummyUser = new User(); dummyUser.setId(userId); dummyUser.setName(\"用户信息暂不可用\"); return new UserOrderDetail(order, dummyUser); // 2. 也可以返回一个空对象、默认值,或者从本地缓存中获取旧数据 // 3. 根据业务场景选择最合适的策略 } // 【异步熔断示例】- 使用 CompletableFuture @CircuitBreaker(name = \"asyncServiceCB\", fallbackMethod = \"asyncFallback\") @TimeLimiter(name = \"asyncServiceCB\") // 组合使用超时控制 public CompletableFuture<String> asyncCall() { return CompletableFuture.supplyAsync(() -> { // 模拟一个耗时的远程调用 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return \"Success\"; }); } public CompletableFuture<String> asyncFallback(Throwable t) { return CompletableFuture.completedFuture(\"Fallback result due to: \" + t.getMessage()); }}