> 文档中心 > java多线程处理文件处理

java多线程处理文件处理

在项目中,经常会有文件处理的场景。单线程处理比较简单,直接,易于实现,但是往往效率较低。所以对于这个场景,多线程处理是一个比较好的解决方法。
本文,将这个文件多线程处理抽象成一个通用问题模型,并抽象成一个工具类,以便复用。业务场景按照需要来实现对文件每一行的处理和文件处理。

工具类:

package com.example.service;import lombok.extern.slf4j.Slf4j;import java.io.*;import java.nio.file.Files;import java.nio.file.Paths;import java.util.concurrent.CountDownLatch;@Slf4jpublic class FileMultiThreadHandler {    private int threadSize;    private CountDownLatch countDownLatch;    private String sourcePath;    private int fileLines;    private LineHandler lineHandler;    public FileMultiThreadHandler(int threadSize, String sourceFile,      LineHandler lineHandler) { this.threadSize = threadSize; this.sourcePath = sourceFile; this.fileLines = (int)getFileLineNum(sourceFile);; this.lineHandler = lineHandler; this.countDownLatch = new CountDownLatch(threadSize);    }    private  long getFileLineNum(String filePath) { try {     return Files.lines(Paths.get(filePath)).count(); } catch (IOException e) {     return -1; }    }    public void waitcdl() throws InterruptedException { countDownLatch.await();    }    public void excute() { int lineSize = fileLines / threadSize; for (int i = 1; i <= threadSize; i++) {     int startIndex = (i - 1) * lineSize;     int endIndex = startIndex + lineSize;     if (i == threadSize) {  endIndex = fileLines;     }     new HandlerThread(i-1, startIndex, endIndex).start(); }    }    public class HandlerThread extends Thread { private int i; private int startIndex; private int endIndex; public HandlerThread(int i, int startIndex, int endIndex) {     this.i = i;     this.startIndex = startIndex;     this.endIndex = endIndex; } @Override public void run() {     long start = System.currentTimeMillis();     BufferedReader br = null;     try {  br = new BufferedReader(new FileReader(sourcePath));  int lineNum = startIndex;  while (lineNum-- > 0 ) {      br.readLine();  }  lineNum = startIndex;  String line;  while ((line = br.readLine()) != null && (lineNum < endIndex)) {      lineHandler.handleLine(line, this.i);      lineNum++;  }     } catch (FileNotFoundException e) {  e.printStackTrace();     } catch (IOException e) {  e.printStackTrace();     } finally {  try {      br.close();      lineHandler.fileCompleted(this.i);      countDownLatch.countDown();      log.info("{} cost {}", Thread.currentThread().getName(),System.currentTimeMillis()-start);  } catch (IOException e) {      e.printStackTrace();  }     } }    }    public interface LineHandler { public abstract void handleLine(String string, int threadId); public abstract void fileCompleted(int threadId);    }}

使用方法:

   // 1. 创建FileMultiThreadHandler, 指定线程数,源文件,实现文件处理业务逻辑   FileMultiThreadHandler fileMultiThreadHandler = new FileMultiThreadHandler(threadSize, sourceFile, new FileMultiThreadHandler.LineHandler() {     @Override     // 2. 在此实现每一行的处理逻辑,threadId指明当前处理的线程号,从0开始     public void handleLine(String line, int threadId) {     }     @Override     // 3. 在此分区文件处理完后,实现业务处理逻辑,threadId指明当前处理的线程号,从0开始     public void fileCompleted(int threadId) {     } }); // 4. 开始处理 fileMultiThreadHandler.excute();// 5. 阻塞等待各个线程处理完毕 fileMultiThreadHandler.waitcdl();

示例代码:

  BufferedWriter[] bws = new BufferedWriter[threadSize]; for (int j = 0; j < threadSize; j++) {     bws[j] = new BufferedWriter(new FileWriter(targetPath + j + ".txt")); } FileMultiThreadHandler fileMultiThreadHandler = new FileMultiThreadHandler(threadSize, sourceFile, new FileMultiThreadHandler.LineHandler() {     @Override     public void handleLine(String line, int threadId) {  // 添加查号逻辑  try {      bws[threadId].write(line);      bws[threadId].newLine();  } catch (Exception e) {  }     }     @Override     public void fileCompleted(int threadId) {  try {      bws[threadId].flush();      bws[threadId].close();  } catch (Exception e) {  }     } }); long startTime = System.currentTimeMillis(); try {     fileMultiThreadHandler.excute();     fileMultiThreadHandler.waitcdl(); } catch (InterruptedException e) {     e.printStackTrace(); }