> 文档中心 > BIO和NIO

BIO和NIO

        开始学习网络编程,刚入门NIO,还有很多概念没清楚,codeing还在提升,有看到我的友友可以给我些学习的建议吗 。。。

目录

一、什么是阻塞和非阻塞

二、BIO

2.1 初始BIO

2.2 多线程 BIO

2.3 线程池

2.4 BIO总结

三、NIO

3.1 初始NIO FIleChannel

3.2 NIO下实现CS通信

3.3 黏包和半包

3.4 小整合(CS通信 + 消息边界问题 + buffer容量)

3.4.1 log4j日志

3.4.2 CS通信

 3.4.3 完整代码

3.5 服务端大数据传输

3.6 多线程环境下的NIO

3.6.1 初始版

3.6.2 方案一

3.6.3 方案二: 使用阻塞队列

3.7 NIO总结


一、什么是阻塞和非阻塞

        阻塞:对于每个线程,只能处理一个请求;在数据接受前和accept前会陷入阻塞状态

如:每个顾客都有一名专门的服务生接待

        非阻塞:对于每个线程,可以同时管理多个客户端的请求;但是在(Buffer + Channel)NIO中,服务端与某一个客户端建立连接后,仍不能去处理其他请求;需要Selector的配合

我在学习NIO后在稍微懂点这两个概念。。。

二、BIO

BIO:阻塞IO, 传统使用的IO流都是BIO, 当使用accept()和read()的时候都会陷入阻塞状态,直到数据准备完成。

2.1 初始BIO

目标:完成初始版的server 和 client 通信

server:

public class Server {    public static void main(String[] args) { try {     System.out.println("我是服务端,我开启了");     // 1. 创建服务端的Socket连接     ServerSocket serverSocket = new ServerSocket(9999);     // 2. 等待接受客户端信息     Socket socket = serverSocket.accept();     // 3.通过socket获取输入流     InputStream is = socket.getInputStream();     // 4.将字节流封装成字符缓冲流     BufferedReader br = new BufferedReader(new InputStreamReader(is));     // 5.获取数据     String res = null;     /*  这里使用if,ServerSocket和Socket在BIO模式下是共死的  当Client的数据发送完,服务端也会自动刷新了解,如果使用  while会一直等待数据,而Client却已经关闭,读取失败      */     if ((res = br.readLine()) != null) {  System.out.println(res);     } } catch (IOException e) {     e.printStackTrace(); }    }}

client:

public class Client {    public static void main(String[] args) { try {     // 1.创建客户端的Socket连接     Socket socket = new Socket("127.0.0.1", 9999);     // 2.获取字符输出流     OutputStream os = socket.getOutputStream();     String res = "我是客户端发送的数据";     // 3.将字节流封装成字符打印流,速度更快     PrintWriter pw = new PrintWriter(os);     pw.println(res);     pw.flush(); } catch (IOException e) {     e.printStackTrace(); }    }}

问题:初始版中,只能处理一个服务端一个客户端,效率低

2.2 多线程 BIO

使用多线程的方式,定义单独的线程处理数据的读写

server:

/** *  实现一台服务器和多台客户端进行通信 *      解决方案:使用多线程的方式 * *  通过多线程的方式能够达到一对多的场景,但是当并发量高的时候,线程 *  数量增多,很容易导致资源泄露 */public class Server {    public static void main(String[] args) { try {     System.out.println("客户端已经启动。。。。");     ServerSocket serverSocket = new ServerSocket(9999);     while (true) {  // 等待接受客户端的连接  Socket socket = serverSocket.accept();  System.out.println("创建连接");  // 将接受的客户端socket交给线程执行  new HandlerSocket(socket).start();     } } catch (IOException e) {     e.printStackTrace(); }    }}class HandlerSocket extends Thread {    private Socket socket;    public HandlerSocket(Socket socket) { this.socket = socket;    }    @Override    public void run() { try {     InputStream is = socket.getInputStream();     // 封装成字符缓冲区     BufferedReader bf = new BufferedReader(new InputStreamReader(is));     String msg;     while ((msg = bf.readLine()) != null) {  // 读取数据  System.out.println("客户端" + socket.getLocalAddress() + ":" + msg);     } } catch (IOException e) {     e.printStackTrace(); }    }}

client:

public class Client {    public static void main(String[] args) { try {     Socket socket = new Socket("127.0.0.1", 9999);     OutputStream os = socket.getOutputStream();     // 将输出流封装成打印流     PrintWriter pw = new PrintWriter(os);     Scanner in = new Scanner(System.in);     while (true) {  System.out.println("请说:");  String msg = in.next();  pw.println(msg);  pw.flush();     } } catch (IOException e) {     e.printStackTrace(); }    }}

问题:每个客户端由单个线程进行执行,当请求过多的时候,需要等量的线程进行处理,资源占有率极高,容易造成服务器宕机。

2.3 线程池

        为了解决2.2线程过多的问题,考虑通过线程池控制创建线程的数量

handlerThreadPool:

/** *  线程池工具类 */public class HandlerSocketPool {    // 创建一个线程池    private static ExecutorService executorService;    /* 线程池参数: ThreadPoolExecutor(int corePoolSize,  // 在线程池中保持活跃线程的最大数量  int maximumPoolSize,  // 线程池中最多创建的数量  long keepAliveTime,  // 线程存活时间  TimeUnit unit, // 单位  BlockingQueue workQueue)  //阻塞队列     */    public static ExecutorService createPool(int maximumPoolSize, int queueSize) { executorService = new ThreadPoolExecutor(3  , maximumPoolSize  , 120, TimeUnit.SECONDS  , new ArrayBlockingQueue(queueSize)); return executorService;    }    public static void executeTask(Runnable myRunnable) { executorService.execute(myRunnable);    }}

server:

public class Server {    public static void main(String[] args) { System.out.println("服务端启动。。。。"); try {     ServerSocket serverSocket = new ServerSocket(9999);     // 创建一个线程池     HandlerSocketPool.createPool(3, 5);     while (true) {  Socket socket = serverSocket.accept();  // 创建Runnable  HandlerSocketPool.executeTask(new MyRunnable(socket));     } } catch (IOException e) {     e.printStackTrace(); }    }}class MyRunnable implements Runnable {    private Socket socket;    public MyRunnable(Socket socket) { this.socket = socket;    }    public void run() { try {     InputStream is = socket.getInputStream();     BufferedReader bf = new BufferedReader(new InputStreamReader(is));     String msg;     while ((msg = bf.readLine()) != null) {  System.out.println("客户端" + socket.getLocalAddress() + ":" + msg);     } } catch (IOException e) {     e.printStackTrace(); }    }}

client:

public class Client {    public static void main(String[] args) { try {     Socket socket = new Socket("127.0.0.1", 9999);     OutputStream os = socket.getOutputStream();     // 将输出流封装成打印流     PrintWriter pw = new PrintWriter(os);     Scanner in = new Scanner(System.in);     while (true) {  System.out.println("请说:");  String msg = in.next();  pw.println(msg);  pw.flush();     } } catch (IOException e) {     e.printStackTrace(); }    }}

2.4 BIO总结

1.BIO模式,是一种阻塞IO的方式,也是最传统的IO;使用Socket进行通信,每一个客户端和服务端正常只能建立一次连接这种方式导致服务端与连接方建立连接后,服务端会一直等待客户端的信息(阻塞),一旦客户端的发送完信息断开连接,服务端立马抛出异常(殉情)2.为了解决一个服务端只能和一个客户端通信,所以可以使用线程的方式,将不同的客户端socket交由不同的线程执行;同时,为了避免线程过多导致资源消耗殆尽,可以使用线程池的方式进行管理2.使用BIO进行文件上传,使用DataXXXStream系列可以实现数据分段发送;在客户端发送信息结束后,应该使用socket.shutdown()通知服务端信息已经发送完成

三、NIO

        非阻塞IO,可以实现一个线程解决多个客户端的请求,核心组件:Buffer, Channel,Selector

Buffer是数据的载体,数据流向是双向的;相比于BIO数据单项流动,使用Buffer在Channel(通道)中进行传递。Channel的类型:FileChannel, DatagramChannel,  ServerSocketChannel, SocketChannel。Selector的作用用于管理多个管道,通过selector()进行阻塞,当某一通道的数据准备好(可连接,可读,可写),那么就执行相应的回调函数

3.1 初始NIO FIleChannel

public class TestReadFIle {    @Test    public void testRead() { try {     // 1.通过传统IO获取得到文件输入流     FileInputStream is = new FileInputStream("D:\\IDEA_Work\\MavenProject\\9_BIO_NIO\\src\\main\\java\\com\\righteye\\nio\\com\\righteye\\demo01\\test.txt");     // 2.获取管道     FileChannel channel = is.getChannel();     // 3.创建缓冲区   allocate:Allocates a new byte buffer.     ByteBuffer buffer = ByteBuffer.allocate(1024);     // 4.将数据存入缓冲区     channel.read(buffer);     // 6.将文件变为读模式  Flips this buffer.     // The limit is set to the current position and then the position is set to zero.     buffer.flip();     // 5.封装成String     String res = new String(buffer.array(), 0, buffer.remaining());     // 打印结果     System.out.println(res); } catch (Exception e) {     e.printStackTrace(); }    }}

1.FileChannel是用于文件读写的管道

2. Buffer中的常见API:   1.hashRemaining, 判断缓冲器是否有数据   2.mark() 设置标记   3.remaining, 返回position和limit之间的元素个数   4.flip(),将当前位置设置为limit, 然后当前下标归零,表示可读状态   5.clear() 进入写模式   操作数据的方法:   get和put

3.2 NIO下实现CS通信

server:

public class Server {    public static void main(String[] args) { try {     System.out.println("服务端等待连接。。。");     // 1.创建服务端的管道,用于接收客户端的请求     // open(): Opens a server-socket channel.     ServerSocketChannel ssChannel = ServerSocketChannel.open();     // 2.将通道转换为非阻塞模式     ssChannel.configureBlocking(false);     // 3.将当前通道绑定一个端口     ssChannel.bind(new InetSocketAddress(9999));     // 4.获取选择器Selector     Selector selector = Selector.open();     // 5.在通道上绑定Selector,监听连接事件     ssChannel.register(selector, SelectionKey.OP_ACCEPT);     // 6.当连接建立成功,说明Selector中会触发相应事件     // select()方法, 阻塞直到某一通道中数据准备好,否则进入轮询状态     while (selector.select() > 0) {  // 7.获取Selector中所有准备好的事件  Iterator it = selector.selectedKeys().iterator();  // 8.遍历准备好的事件  while (it.hasNext()) {      // 9.获取当前要执行的事件      SelectionKey sk = it.next();      // 10. 如果当前事件是建立连接      if (sk.isAcceptable()) {   // 11.获取客户端的管道,用于获取数据   SocketChannel scChannel = ssChannel.accept();   // 12.将管道设置为非阻塞   scChannel.configureBlocking(false);   // 13.监听客户端写数据,在服务端部分用于读数据   scChannel.register(selector, SelectionKey.OP_READ);      } else if (sk.isReadable()) {  // 可以读数据   System.out.println("测试开始。。。");   // 14. 获取客户端的数据管道   SocketChannel scChannel = (SocketChannel) sk.channel();   // 15.创建缓冲区,NIO中数据的读取在缓冲区中   ByteBuffer buffer = ByteBuffer.allocate(1024);   try {int len = 0;while ((len = scChannel.read(buffer)) > 0) {  // 这块不能写 != -1    // 设置读模式    buffer.flip();    System.out.println(new String(buffer.array(), 0, len));    buffer.clear();}   } catch (IOException e) {System.out.println("有人下线了");sk.cancel();scChannel.close();e.printStackTrace();   }      }      // 16.事件处理完进行清除,不去除事件下一次容易发生空指针异常      it.remove();  }     } } catch (IOException e) {     e.printStackTrace(); }    }}
 // 8.遍历准备好的事件while (it.hasNext()) {      // 。。。。      // 16.事件处理完进行清除,不去除事件下一次容易发生空指针异常      it.remove();}

这里的 it.remove不要忘记加,因为所有的事件在注册后会加入到 selectionKey集合中,在NIO中,所有的事件要么被执行,如果是未执行的事件会在下次继续加入到集合中,而在下一次重新进行遍历,此时会进入 isAcceptable()分支:

SocketChannel scChannel = ssChannel.accept();  // 没有接受返回值为NULLscChannel.configureBlocking(false);   // 空指针异常

client:

public class Client {    public static void main(String[] args) { try {     // 1.获取客户端的通道     SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));     // 2.将通道设置为非阻塞     //socketChannel.configureBlocking(false);     // 3.创建缓冲区     ByteBuffer buffer = ByteBuffer.allocate(1024);     Scanner in = new Scanner(System.in);     while (true) {  System.out.println("请说:");  String msg = in.nextLine();  // 4.向缓冲区中写数据  buffer.put(("我:" + msg).getBytes());  // 5.将buffer中的数据变为可读态  buffer.flip();  socketChannel.write(buffer);  buffer.clear();     }     //socketChannel.shutdownOutput(); } catch (IOException e) {     e.printStackTrace(); }    }}

3.3 黏包和半包

        问题描述:在网络传输中,比如有:“Helloworld\nI`m zhangsan\nHo” 和 “w are you\n

由于为了通信的效率,传输的数据报中的内容可能会不同于自己发送的初衷。

如:"Helloworld\nI`m zhangsan\n" 为两段话糅合在一起,称为黏包

而: “Ho” 和 w are you\n” 是完整的一段话,但是在传输的过程被分开传输了,称为半包

解决方案:(消息边界问题)

处理消息边界的方法: -》 黏包 半包的状况    1). 所有客户端统一一个最长的ByteBuffer容量   浪费带宽    2). 在每段消息的后面添加 标志, 然后进行特殊处理   按1bit进行遍历,效率低    3). 类比Http协议,数据包分为两部分,第一部分长度固定表示内容长度,第二部分表示内容 如:Http2.0 使用LTV格式,代表长度,类型,内容

代码实现:

public void split(ByteBuffer source) {   // 进入读模式   source.flip();   for (int i = 0; i < source.remaining(); i++) {    // 遇到标识表示已经接受一段话    if (source.get(i) == '\n') {int len = i + 1 - source.position();ByteBuffer target = ByteBuffer.allocate(len);for (int j = 0; j < len; j++) {// 使用get(),一个字节的进行读取,复制到新的缓冲区target.put(source.get());    }    System.out.println(new String(target.array(), 0, len));}    }    logger.debug("split() end...");    // 每次划分之后可能还有剩余的数据,所以进行压缩,用于下次接受新数据    source.compact();}

3.4 小整合(CS通信 + 消息边界问题 + buffer容量)

问题描述:

        1. 在满足服务端和客户端能通信的前提下,模拟客户端的(非)正常退出

        2.验证当客户端发送的数据超过缓冲区大小的时候发生的黏包,半包现象;

        3. buffer容量问题,客户端发送消息不能保证满足buffer的最大容量,需要考虑扩容问题

3.4.1 log4j日志

使用日志进行控制台的显示,便于观察

pom依赖:

org.slf4jslf4j-api1.6.1org.slf4jslf4j-log4j121.6.1

log4j.properties

log4j.rootLogger=DEBUG,consoleAppender,logfilelog4j.appender.consoleAppender=org.apache.log4j.ConsoleAppenderlog4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.consoleAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%p] %m%nlog4j.appender.logfile=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.logfile.File=/home/admin/demo/logs/demo.loglog4j.appender.logfile.Append = truelog4j.appender.logfile.DatePattern='.'yyyy-MM-dd#log4j.appender.logfile=org.apache.log4j.RollingFileAppender#log4j.appender.logfile.File=/home/admin/demo/logs/demo.log#log4j.appender.logfile.Append = true#log4j.appender.logfile.MaxFileSize = 10MB#log4j.appender.logfile.MaxBackupIndex = 20log4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

3.4.2 CS通信

定义成员变量:

private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class);private Selector selector;  // 用于管理Channelprivate ServerSocketChannel ssc;  // 服务端通道private static final int PORT = 9999;

构造器进行初始化

// 初始化public TestSelectorINfoRange() throws IOException {selector = Selector.open();ssc = ServerSocketChannel.open();// 设置非阻塞模式ssc.configureBlocking(false);// 为通道绑定端口ssc.bind(new InetSocketAddress(PORT));// 绑定监听连接事件ssc.register(selector, SelectionKey.OP_ACCEPT);}

Acceptable事件监听:

if (sk.isAcceptable()) {SocketChannel sc = ssc.accept();  // 获取客户端连接sc.configureBlocking(false);  // 用于读取客户端中的buffer// 为了实现同一次事件中使用的Buffer是相同的,所以要将buffer与通道绑定, register中的第三个参数,作为attachment附件存在ByteBuffer buffer = ByteBuffer.allocate(16);SelectionKey clientSK = sc.register(selector, 0, buffer);// 为客户端的事件添加监听clientSK.interestOps(SelectionKey.OP_READ);}

register()函数说明:

SelectionKey clientSK = sc.register(selector, 0, buffer);

Registers this channel with the given selector, returning a selectionkey.

返回值是SelectionKey, 表示当前触发的事件

第三个参数表示:参数如果不为空,那么就将这个Object作为附件绑定到Channel中

If the att argument is not null then the key's attachmentwill have been set to that value.

Readable事件监听:

else if (sk.isReadable()) {// ByteBuffer buffer = ByteBuffer.allocate(16);  修改前SocketChannel sc = (SocketChannel) sk.channel();// 从事件中获取附件 bufferByteBuffer buffer = (ByteBuffer) sk.attachment();try {int len = sc.read(buffer);// 客户端连接断开,没有数据了;一次连接close也表示一次读请求if (len == -1) {// 从selectionKey集合中删除事件sk.cancel();} else {// 开启读模式//   buffer.flip();//   System.out.println(new String(buffer.array(), 0, buffer.limit()));split(buffer);// 说明buffer不够,需要进行扩容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer);// 更新附件sk.attach(newBuffer);}}} catch (IOException e) {sk.cancel();  // 异常删除事件}}
if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer);// 更新附件sk.attach(newBuffer);}

 扩容操作,当客户端发送的数据超过buffer的容量的时候,处理黏包,半包的split()方法因为无法检测到结尾标识所以无法进行输出,从而进行下一次的数据读写;导致数据会产生丢失

 3.4.3 完整代码

client:

public class Client implements AutoCloseable{    private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class);    private SocketChannel sc;    private static final int PORT = 9999;    private static final String url = "127.0.0.1";    public Client() throws IOException { sc = SocketChannel.open(new InetSocketAddress(url, PORT)); sc.configureBlocking(false); logger.debug("client initial finish...");    }    public static void main(String[] args) { try (Client client = new Client()){     client.sendInfo(); } catch (Exception e) {     e.printStackTrace(); }    }    private void sendInfo() { try {     while (true) {  Scanner in = new Scanner(System.in);  System.out.println("请说:");  String msg = in.nextLine();  sc.write(Charset.defaultCharset().encode(msg + "\n"));  logger.debug("client info finish...");     } } catch (IOException e) {     e.printStackTrace(); }    }    @Override    public void close() throws Exception { if (sc != null) {     sc.close(); }    }}

server: (CS通信 + Split函数) split的书写在上面有实例

public class TestSelectorINfoRange implements AutoCloseable {    private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class);    private Selector selector;  // 用于管理Channel    private ServerSocketChannel ssc;  // 服务端通道    private static final int PORT = 9999;    // 初始化    public TestSelectorINfoRange() throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); // 设置非阻塞模式 ssc.configureBlocking(false); // 为通道绑定端口 ssc.bind(new InetSocketAddress(PORT)); // 绑定监听连接事件 ssc.register(selector, SelectionKey.OP_ACCEPT); logger.debug("初始化服务端完成。。。。");    }    public void close() { if (selector != null) {     try {  selector.close();     } catch (IOException e) {  e.printStackTrace();     } } if (ssc != null) {     try {  ssc.close();     } catch (IOException e) {  e.printStackTrace();     } }    }    public static void main(String[] args) { try (TestSelectorINfoRange testSelectorINfoRange = new TestSelectorINfoRange()){     // 开启监听事件     testSelectorINfoRange.listen(); } catch (IOException e) {     e.printStackTrace(); }    }    private void listen() { while (true) {     try {  logger.debug("select before ...");  selector.select();  // 阻塞等待事件发生  // 获取事件集并进行迭代  Iterator it = selector.selectedKeys().iterator();  while (it.hasNext()) {      SelectionKey sk = it.next();      // 处理事件      if (sk.isAcceptable()) {   SocketChannel sc = ssc.accept();  // 获取客户端连接   sc.configureBlocking(false);  // 用于读取客户端中的buffer   // 为了实现同一次事件中使用的Buffer是相同的,所以要将buffer与通道绑定, register中的第三个参数,作为attachment附件存在   ByteBuffer buffer = ByteBuffer.allocate(16);   SelectionKey clientSK = sc.register(selector, 0, buffer);   // 为客户端的事件添加监听   clientSK.interestOps(SelectionKey.OP_READ);   logger.debug("client socket: {}", clientSK);      } else if (sk.isReadable()) {   logger.debug("read listener ...");   // ByteBuffer buffer = ByteBuffer.allocate(16);  修改前   SocketChannel sc = (SocketChannel) sk.channel();   // 从事件中获取附件 buffer   ByteBuffer buffer = (ByteBuffer) sk.attachment();   try {int len = sc.read(buffer);// 客户端连接断开,没有数据了;一次连接close也表示一次读请求if (len == -1) {    // 从selectionKey集合中删除事件    sk.cancel();    logger.debug("client socket formal exit: {}", sk);} else {    // 开启读模式//    buffer.flip();//    System.out.println(new String(buffer.array(), 0, buffer.limit()));    split(buffer);    // 说明buffer不够,需要进行扩容    if (buffer.position() == buffer.limit()) { logger.debug("buffer Expansion..."); ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 更新附件 sk.attach(newBuffer);    }}   } catch (IOException e) {logger.error("client socket informal exit: {}", sk);sk.cancel();   }      }      // 每一次迭代都带将执行完的事件从集合中移除,避免下次再次分配进入集合      it.remove();  }     } catch (IOException e) {  e.printStackTrace();  logger.error("连接终止。。。");     } }    }    // 解决消息边界问题    /* 当客户端发送的数据超过buffer, 进行二倍扩容     */    public void split(ByteBuffer source) { // 进入读模式 source.flip(); for (int i = 0; i < source.remaining(); i++) {     // 遇到标识表示已经接受一段话     if (source.get(i) == '\n') {  int len = i + 1 - source.position();  ByteBuffer target = ByteBuffer.allocate(len);  for (int j = 0; j < len; j++) {      // 使用get(),一个字节的进行读取,复制到新的缓冲区      target.put(source.get());  }  System.out.println(new String(target.array(), 0, len));     } } logger.debug("split() end..."); // 每次划分之后可能还有剩余的数据,所以进行压缩,用于下次接受新数据 source.compact();    }}

运行结果:

server:

client:

3.5 服务端大数据传输

        问题描述:在服务端向数据库写数据的时候,会使用通道中的buffer写入数据,但是buffer也是有最大容量的,当缓冲区满了服务端写入的数据就会是0, 由于轮询操作会导致大量无用的写操作产生。

        解决方案:不在一次性写入所有数据,而是在buffer可以写的情况下触发 写事件,由selector进行监听

public class Server {    private static final Logger logger = LoggerFactory.getLogger(Server.class);    public static void main(String[] args) { try (Selector selector = Selector.open();      ServerSocketChannel ssc = ServerSocketChannel.open()) {     ssc.configureBlocking(false);     ssc.bind(new InetSocketAddress(9999));     ssc.register(selector, SelectionKey.OP_ACCEPT);     while (true) {  selector.select();  Iterator it = selector.selectedKeys().iterator();  while (it.hasNext()) {      SelectionKey sk = it.next();      it.remove();      if (sk.isAcceptable()) {   SocketChannel sc = ssc.accept();   sc.configureBlocking(false);   SelectionKey skClient = sc.register(selector, 0, null);   skClient.interestOps(SelectionKey.OP_READ);   logger.debug("connect finish...");   StringBuilder sb = new StringBuilder();   // 给客户端发送消息   for (int i = 0; i < 250000000; i++) sb.append("a");   ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());   int count = sc.write(buffer);   logger.debug("当前打印数量:" + count);   // 避免缓冲区满导致无用的执行   if (buffer.hasRemaining()) {skClient.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);skClient.attach(buffer);  // 添加一个附件   }   logger.debug("print finish...");      } else if (sk.isWritable()) {  // 核心操作   SocketChannel sc = (SocketChannel) sk.channel();   ByteBuffer buffer = (ByteBuffer) sk.attachment();   int count = sc.write(buffer);   System.out.println(count);   // 如果没有数据了,删除写事件   while (!buffer.hasRemaining()) {sk.attach(null);sk.interestOps(SelectionKey.OP_READ);   }      }  }     } } catch (IOException e) {     e.printStackTrace(); }    }}

3.6 多线程环境下的NIO

主要处理某一个事件占用时间过长,导致selector监听事件效率下降
解决方案:配置多个线程,由一个boss线程负责连接创建,若干worker线程负责读写操作

3.6.1 初始版

worker线程:

static class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private boolean flag = false;  // 标志当前worker是否是第一次创建public Worker(String name) {this.name = name;}// 初始化public void register() throws IOException {if (!flag) {thread = new Thread(this, name);selector = Selector.open();thread.start();flag = true;}}// 具体worker进行工作@Overridepublic void run() {while (true) {try {selector.select();  // 阻塞事件Iterator it = selector.selectedKeys().iterator();while (it.hasNext()) {SelectionKey sk = it.next();it.remove();if (sk.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) sk.channel();channel.read(buffer); buffer.flip();log.debug("print data: {}", new String(buffer.array(), 0, buffer.remaining()));}}} catch (IOException e) {e.printStackTrace();}}}}

主线程:

public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open();     Selector boss = Selector.open()) {     Thread.currentThread().setName("boss");     ssc.configureBlocking(false);     ssc.bind(new InetSocketAddress(9999));     ssc.register(boss, SelectionKey.OP_ACCEPT);  // boss线程负责连接创建     /*  创建工作线程,给工作线程中的selector注册写事件  不能再每一次连接创建就创建一个worker节点,如果这样做和传统线程池没什么区别      */     Worker worker = new Worker("worker-0");     while (boss.select() > 0) {  Iterator it = boss.selectedKeys().iterator();  while (it.hasNext()) {      SelectionKey sk = it.next();      it.remove();      if (sk.isAcceptable()) {   log.debug("connect wait ...");   SocketChannel sc = ssc.accept();   sc.configureBlocking(false);   worker.register();      sc.register(worker.selector, SelectionKey.OP_READ);      }  }     } } catch (IOException e) {     e.printStackTrace(); }    }

 启动客户端和服务端后发送消息:

 原因:

主线程中:

 worker.register();    // 执行后启动了 worker线程的run方法,selector阻塞线程                     
 sc.register(worker.selector, SelectionKey.OP_READ);  // 事件没有被注册

3.6.2 方案一

worker.register(sc);worker.selector.wakeup();  // 2.方案一:唤醒一次selector,保证事件注册sc.register(worker.selector, SelectionKey.OP_READ);

3.6.3 方案二: 使用阻塞队列

@Slf4jpublic class MutilThreadServer {    public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open();     Selector boss = Selector.open()) {     Thread.currentThread().setName("boss");     ssc.configureBlocking(false);     ssc.bind(new InetSocketAddress(9999));     ssc.register(boss, SelectionKey.OP_ACCEPT);  // boss线程负责连接创建     /*  创建工作线程,给工作线程中的selector注册写事件  不能再每一次连接创建就创建一个worker节点,如果这样做和传统线程池没什么区别      */     Worker worker = new Worker("worker-0");     while (boss.select() > 0) {  Iterator it = boss.selectedKeys().iterator();  while (it.hasNext()) {      SelectionKey sk = it.next();      it.remove();      if (sk.isAcceptable()) {   log.debug("connect wait ...");   SocketChannel sc = ssc.accept();   sc.configureBlocking(false);   log.debug("connect finish ...{}", sc.getRemoteAddress());   /* 1.此时客户端发送信息无法接受;因为worker线程和主线程不是同步的,worker线程首先启动然后selector.select()    *   进入阻塞状态,此时事件还没有被注册    *解决方案:如何控制事件注册一定发生在阻塞前 -》 注册时候唤醒一次select    */   worker.register(sc);  // 将通道作为参数传递      }  }     } } catch (IOException e) {     e.printStackTrace(); }    }    /* 对于Worker线程中的成员变量,应该是每个worker独享一份;不能使用static,否则表示整个类创建的实例都共享一份     */    static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private boolean flag = false;  // 标志当前worker是否是第一次创建 private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); public Worker(String name) {     this.name = name; } // 初始化 public void register(SocketChannel sc) throws IOException {     log.debug("initial begin...");     if (!flag) {  thread = new Thread(this, name);  selector = Selector.open();  thread.start();  flag = true;     }     log.debug("initial end...");     log.debug("register begin...");     /*  4. 但是此时调用register的还是boss线程,没有实际解决线程同步问题      *///     sc.register(selector, SelectionKey.OP_READ);     // 5.使用一种阻塞队列的方式,实现线程之间的通信     queue.add(() -> {  try {      sc.register(selector, SelectionKey.OP_READ);  } catch (ClosedChannelException e) {      e.printStackTrace();  }     });     log.debug("register end...");     selector.wakeup();  // 唤醒一次,避免阻塞,保证事件一定注册上 } // 具体worker进行工作 @Override public void run() {     while (true) {  try {      selector.select();  // 阻塞事件      // 拿到队列中的事件      Runnable task = queue.poll();      if (task != null) {   task.run();      }      Iterator it = selector.selectedKeys().iterator();      while (it.hasNext()) {   SelectionKey sk = it.next();   it.remove();   if (sk.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) sk.channel();channel.read(buffer); buffer.flip();log.debug("print data: {}", new String(buffer.array(), 0, buffer.remaining()));   }      }  } catch (IOException e) {      e.printStackTrace();  }     } }    }}

3.7 NIO总结

记录一部分笔记:

    8.IO模型:阻塞IO,非阻塞IO,多路复用,信号驱动,异步IO    请求资源(用户态)和等待资源(内核态)阶段    内核态中的读取操作:分为等待数据阶段,最后再到复制数据阶段(实现数据从磁盘到内核内存中) 阻塞IO和多路复用的区别: 1.BIO涉及一次系统调用;多路复用设计两次系统调用(select, read) 2.在请求过多的情况下,多路复用可以通过selector同时监听很多事件,只要获取到需要的数据便可以在相应通道执行操作     但是对于阻塞IO,每个操作都是串行的,只有上一个操作结束才能执行下一项     同步:发生在一个线程中,由一个线程发送请求并等待数据   异步:两个线程进行,发送请求线程等待其他线程(执行对应的回调函数)回复数据   9.零拷贝问题:    传统数据传输问题:java代码: read -> write 切换内核态:磁盘 -> 内核缓冲区 -> 用户态 -> socket缓冲区 -> 网卡   (这个过程:用户态和内核态的切换 3 次: 用户态 -》(read)内核态 -》 用户态 -》(write) -》 内核态      数据拷贝:4次)    NIO优化:      1.使用ByteBuffer.allocateDirect() 分配直接内存,属于OS中的内存,由java和OS共享  2.Linux2.1后提出sendFile, 对应java中使用NIO中的transferTo方法,能直接从内核缓冲区 -》 socket缓冲区  3.Linux2.4后实现优化,不在直接将数据拷贝到socket缓冲区,而是记录一些偏移量,从内核缓冲区直接复制数据到网卡    零拷贝针对的是内核态与用户态之间没有数据的拷贝,不会将数据拷贝到JVM内存