java多线程处理文件处理
在项目中,经常会有文件处理的场景。单线程处理比较简单,直接,易于实现,但是往往效率较低。所以对于这个场景,多线程处理是一个比较好的解决方法。
本文,将这个文件多线程处理抽象成一个通用问题模型,并抽象成一个工具类,以便复用。业务场景按照需要来实现对文件每一行的处理和文件处理。
工具类:
package com.example.service;import lombok.extern.slf4j.Slf4j;import java.io.*;import java.nio.file.Files;import java.nio.file.Paths;import java.util.concurrent.CountDownLatch;@Slf4jpublic class FileMultiThreadHandler { private int threadSize; private CountDownLatch countDownLatch; private String sourcePath; private int fileLines; private LineHandler lineHandler; public FileMultiThreadHandler(int threadSize, String sourceFile, LineHandler lineHandler) { this.threadSize = threadSize; this.sourcePath = sourceFile; this.fileLines = (int)getFileLineNum(sourceFile);; this.lineHandler = lineHandler; this.countDownLatch = new CountDownLatch(threadSize); } private long getFileLineNum(String filePath) { try { return Files.lines(Paths.get(filePath)).count(); } catch (IOException e) { return -1; } } public void waitcdl() throws InterruptedException { countDownLatch.await(); } public void excute() { int lineSize = fileLines / threadSize; for (int i = 1; i <= threadSize; i++) { int startIndex = (i - 1) * lineSize; int endIndex = startIndex + lineSize; if (i == threadSize) { endIndex = fileLines; } new HandlerThread(i-1, startIndex, endIndex).start(); } } public class HandlerThread extends Thread { private int i; private int startIndex; private int endIndex; public HandlerThread(int i, int startIndex, int endIndex) { this.i = i; this.startIndex = startIndex; this.endIndex = endIndex; } @Override public void run() { long start = System.currentTimeMillis(); BufferedReader br = null; try { br = new BufferedReader(new FileReader(sourcePath)); int lineNum = startIndex; while (lineNum-- > 0 ) { br.readLine(); } lineNum = startIndex; String line; while ((line = br.readLine()) != null && (lineNum < endIndex)) { lineHandler.handleLine(line, this.i); lineNum++; } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { br.close(); lineHandler.fileCompleted(this.i); countDownLatch.countDown(); log.info("{} cost {}", Thread.currentThread().getName(),System.currentTimeMillis()-start); } catch (IOException e) { e.printStackTrace(); } } } } public interface LineHandler { public abstract void handleLine(String string, int threadId); public abstract void fileCompleted(int threadId); }}
使用方法:
// 1. 创建FileMultiThreadHandler, 指定线程数,源文件,实现文件处理业务逻辑 FileMultiThreadHandler fileMultiThreadHandler = new FileMultiThreadHandler(threadSize, sourceFile, new FileMultiThreadHandler.LineHandler() { @Override // 2. 在此实现每一行的处理逻辑,threadId指明当前处理的线程号,从0开始 public void handleLine(String line, int threadId) { } @Override // 3. 在此分区文件处理完后,实现业务处理逻辑,threadId指明当前处理的线程号,从0开始 public void fileCompleted(int threadId) { } }); // 4. 开始处理 fileMultiThreadHandler.excute();// 5. 阻塞等待各个线程处理完毕 fileMultiThreadHandler.waitcdl();
示例代码:
BufferedWriter[] bws = new BufferedWriter[threadSize]; for (int j = 0; j < threadSize; j++) { bws[j] = new BufferedWriter(new FileWriter(targetPath + j + ".txt")); } FileMultiThreadHandler fileMultiThreadHandler = new FileMultiThreadHandler(threadSize, sourceFile, new FileMultiThreadHandler.LineHandler() { @Override public void handleLine(String line, int threadId) { // 添加查号逻辑 try { bws[threadId].write(line); bws[threadId].newLine(); } catch (Exception e) { } } @Override public void fileCompleted(int threadId) { try { bws[threadId].flush(); bws[threadId].close(); } catch (Exception e) { } } }); long startTime = System.currentTimeMillis(); try { fileMultiThreadHandler.excute(); fileMultiThreadHandler.waitcdl(); } catch (InterruptedException e) { e.printStackTrace(); }