> 技术文档 > 【Java 大数据量分批处理与多线程优化】_java 多线程处理大批量数据

【Java 大数据量分批处理与多线程优化】_java 多线程处理大批量数据


Java 大数据量分批处理与多线程优化实践

1. 背景

在实际开发中,我们经常需要处理大批量数据的插入或更新操作。如果一次性处理所有数据,可能会导致以下问题:

  • 数据库压力过大:单次事务处理大量数据,可能导致锁表、连接池耗尽。
  • 内存溢出(OOM):大列表一次性加载到内存,可能触发 OutOfMemoryError
  • 性能瓶颈:单线程处理大数据量时,执行时间过长。

本文将介绍如何通过分批处理 + 多线程优化来解决这些问题,并提供完整的代码示例。


2. 基础分批处理方案

2.1 单线程分批处理

public static final int perSize = 200; // 每批次处理200条数据public void batchProcess(List<Data> dataList) { if (CollectionUtil.isNotEmpty(dataList)) { int batchCount = dataList.size() / perSize + 1; // 计算批次数量 for (int i = 0; i < batchCount; i++) { // 获取当前批次数据 List<Data> batchData; if (i == batchCount - 1) { batchData = dataList.subList(i * perSize, dataList.size()); // 最后一批 } else { batchData = dataList.subList(i * perSize, (i + 1) * perSize); // 常规批次 } // 执行批量操作(如插入或更新) batchUpdate(batchData); } }}

优点

  • 减少单次数据库操作的数据量,避免锁表时间过长。
  • 降低内存占用,防止 OOM

缺点

  • 单线程执行,速度较慢,无法充分利用 CPU 资源。

3. 多线程分批优化

3.1 使用线程池并发处理

// 定义线程池private static final ThreadPoolExecutor updateThreadPool = new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 60, TimeUnit.SECONDS, // 空闲线程存活时间 new LinkedBlockingQueue<>(50), // 任务队列容量 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:队列满时抛出异常);public void concurrentBatchProcess(List<Data> dataList) throws InterruptedException { if (CollectionUtil.isNotEmpty(dataList)) { int batchCount = dataList.size() / perSize + 1; CountDownLatch latch = new CountDownLatch(batchCount); // 计数器,等待所有任务完成 for (int i = 0; i < batchCount; i++) { final int batchIndex = i; updateThreadPool.submit(() -> { try {  List<Data> batchData;  if (batchIndex == batchCount - 1) { batchData = dataList.subList(batchIndex * perSize, dataList.size());  } else { batchData = dataList.subList(batchIndex * perSize, (batchIndex + 1) * perSize);  }  batchUpdate(batchData); // 执行批量操作 } catch (Exception e) {  log.error(\"批次 {} 处理失败: {}\", batchIndex, e.getMessage(), e); } finally {  latch.countDown(); // 任务完成,计数器减1 } }); } latch.await(); // 等待所有任务完成 }}

优化点

  • 多线程并发处理,提高执行速度。
  • CountDownLatch 控制任务完成,确保所有批次执行完毕后再继续后续逻辑。
  • 异常捕获,防止单批次失败影响整体任务。

3.2 线程池优化建议

参数 推荐值 说明 核心线程数 Runtime.getRuntime().availableProcessors() 根据 CPU 核心数动态调整 最大线程数 CPU核心数 * 2 避免过多线程竞争 空闲线程存活时间 30秒 较短时间回收空闲线程 任务队列容量 100~1000 避免任务堆积导致 OOM 拒绝策略 CallerRunsPolicy 队列满时由提交线程执行,避免丢失任务

优化后的线程池配置

ThreadPoolExecutor optimizedPool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());

4. 事务一致性考虑

4.1 是否需要事务?

  • 强一致性要求(如金融交易):
    • 使用单线程 + 全局事务(@Transactional)。
    • 或采用 分布式事务(Seata、TCC)
  • 最终一致性可接受(如日志记录):
    • 多线程分批处理 + 失败重试机制。

4.2 失败重试方案

// 记录失败批次List<Integer> failedBatches = new ArrayList<>();for (int i = 0; i < batchCount; i++) { final int batchIndex = i; updateThreadPool.submit(() -> { try { // ... 执行批次任务 } catch (Exception e) { failedBatches.add(batchIndex); // 记录失败批次 } finally { latch.countDown(); } });}latch.await();// 失败重试if (!failedBatches.isEmpty()) { log.warn(\"以下批次处理失败,尝试重试: {}\", failedBatches); for (int batchIndex : failedBatches) { // 重新执行失败批次 }}

5. 完整代码示例

import java.util.*;import java.util.concurrent.*;import org.springframework.util.CollectionUtils;public class BatchProcessor { private static final int perSize = 200; // 每批次大小 private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy() ); public void processBatch(List<Data> dataList) throws InterruptedException { if (CollectionUtils.isEmpty(dataList)) return; int batchCount = dataList.size() / perSize + 1; CountDownLatch latch = new CountDownLatch(batchCount); List<Integer> failedBatches = new ArrayList<>(); for (int i = 0; i < batchCount; i++) { final int batchIndex = i; threadPool.submit(() -> { try {  List<Data> batchData = getBatchData(dataList, batchIndex, batchCount);  batchUpdate(batchData); } catch (Exception e) {  failedBatches.add(batchIndex);  log.error(\"Batch {} failed: {}\", batchIndex, e.getMessage(), e); } finally {  latch.countDown(); } }); } latch.await(); // 等待所有批次完成 // 失败重试 if (!failedBatches.isEmpty()) { retryFailedBatches(dataList, failedBatches); } } private List<Data> getBatchData(List<Data> dataList, int batchIndex, int batchCount) { int fromIndex = batchIndex * perSize; int toIndex = (batchIndex == batchCount - 1) ? dataList.size() : (batchIndex + 1) * perSize; return dataList.subList(fromIndex, toIndex); } private void batchUpdate(List<Data> batchData) { // 执行数据库批量操作 } private void retryFailedBatches(List<Data> dataList, List<Integer> failedBatches) { // 实现重试逻辑 }}

6. 总结

方案 适用场景 优点 缺点 单线程分批 小数据量、强一致性 简单易实现 速度较慢 多线程分批 大数据量、允许最终一致性 速度快,资源利用率高 需处理线程安全、失败重试 分布式事务 严格一致性要求 数据强一致 性能较低,实现复杂

推荐选择

  • 如果数据量较小(< 1万条),使用 单线程分批
  • 如果数据量大且允许短暂不一致,使用 多线程分批 + 失败重试
  • 如果涉及跨服务事务,使用 Seata/TCC 分布式事务

📌 关注我的CSDN博客,获取更多Java和大数据技术干货!

🚀 如果有帮助,欢迎点赞 + 收藏! 🚀