用Java Swing+NIO实现了一个C/S聊天室程序 支持群聊私聊功能
特别说明
我实现的这个C/S模式的聊天室项目,主要是为了方便大家学习Java NIO的。由于NIO的优势在于单机能处理的并发连接数高,因此特别适合用于聊天程序的服务端。
为什么使用Java Swing来做图形界面呢?我们大家都知道,Java Swing现在已经过时了,并且Java的优势不在于图形界面,但我的需求并不需要漂亮美观的界面,并且Java语言实现C端界面的首选就是Java Swing了,只不过我只用了相对简单的图形组件及组件交互,但这也足够了。
阅读本文需要一点NIO的前置知识,大家可以看一下与本文同专栏的的这篇文章: Java NIO三大组件Buffer、Channel、Selector保姆级教程 附聊天室核心代码
项目代码已上传至CSDN,链接地址为:https://download.csdn.net/download/xl_1803/85162646
本文只粘贴了项目的核心代码,如希望了解项目全貌,请点击上述链接下载项目,零积分便可下载。
如遇到项目启动问题或bug,可在评论区留言。
效果演示
话不多说,先来演示一下我做的聊天程序吧!虽然界面比较简洁,并且只实现了核心功能,但相较而言这些都不是重点!重点在于服务端与客户端之间的NIO通信流程,稍后我会为大家分析讲解代码。
CS模式聊天程序演示
项目结构
├─nio-chat-client 客户端项目│ │ pom.xml│ └─src│ └─main│ ├─java│ │ └─com│ │ └─bobo│ │ │ ChatClientBootstrap.java 客户端启动类│ │ ├─entity│ │ │ FriendMsg.java 存放好友信息│ │ ├─handler 放各种处理器│ │ │ EventHandler.java 图形组件事件处理器│ │ │ IOHandler.java 客户端IO处理器│ │ │ UIHandler.java 图形组件处理器│ │ │ │ │ └─ui│ │ MainUI.java 继承自JFrame,程序主界面│ └─resources│ trumpet.jpeg 小喇叭图标│ ├─nio-chat-common 放服务端与客户端公共代码│ │ pom.xml│ └─src│ └─main│ └─java│└─com│ └─bobo│ ├─constant│ │ CommonConstant.java 公共常量│ ├─entity│ │ ChatMsg.java 服务端与客户端通信的实体类│ └─io 下面两个类分别用于读数据与写数据,用分隔符解决TCP粘拆包问题│ ChatBufferReader.java│ ChatMsgWrapper.java│ └─nio-chat-server 服务端项目 │ pom.xml └─src └─main └─java └─com └─bobo │ ChatServerBootstrap.java 服务端启动类 └─handler ServiceHandler.java 服务端处理器,NIO核心代码都在这个类里
主界面结构
说明:
- localhost:6666为默认的服务端地址,可以修改成你自己的
客户端核心代码分析
入口ChatClientBootstrap
即main方法所在处。
public class ChatClientBootstrap { public static void main(String[] args) { // new一个客户端主界面实例 MainUI mainUI = new MainUI(); // 注册事件 EventHandler.doRegister(mainUI); }}
主界面MainUI
主要用于渲染客户端UI组件,并设定好布局、大小等等。
public class MainUI extends JFrame{ private JLabel id; private JButton connect; private JTextField nickname; private JTextField server; private JButton edit; private JTextArea friends; private JScrollPane friendsScroll; private JTextArea groupArea; private JScrollPane groupScroll; private JTextArea privateArea; private JScrollPane privateScroll; private JLabel privateTitle; private JTextField groupInput; private JTextField privateInput; private JButton groupSend; private JButton privateSend; private JLabel bottomIcon; private JLabel bottom ; public MainUI(){ setLayout(null); // UI组件 id = new JLabel(); connect = new JButton("连接服务器"); nickname = new JTextField(); server = new JTextField("localhost:6666"); edit = new JButton("修改昵称"); friends = new JTextArea(); friendsScroll = new JScrollPane(friends); groupArea = new JTextArea(); groupScroll = new JScrollPane(groupArea); privateArea = new JTextArea(); privateScroll = new JScrollPane(privateArea); privateTitle = new JLabel("私聊窗口"); groupInput = new JTextField(); privateInput = new JTextField(); groupSend = new JButton("发送"); privateSend = new JButton("发送"); bottomIcon = new JLabel(new ImageIcon( new ImageIcon(MainUI.class.getResource("/trumpet.jpeg")).getImage() .getScaledInstance(40,40, Image.SCALE_SMOOTH))); bottom = new JLabel(); // 设置额外属性 nickname.setEditable(false); friends.setEditable(false); server.setToolTipText("服务端地址"); // 设置界面布局 id.setBounds(0,0,120,20); connect.setBounds(0,0,120,20); nickname.setBounds(0,20,120,20); server.setBounds(120,0,100,20); edit.setBounds(120,20,100,20); friendsScroll.setBounds(0,40,221,380); groupScroll.setBounds(225,0,355,180); groupInput.setBounds(225,180,290,20); groupSend.setBounds(515,180,65,20); privateTitle.setBounds(225,200,355,20); privateScroll.setBounds(225,220,355,180); privateInput.setBounds(225,400,290,20); privateSend.setBounds(515,400,65,20); bottomIcon.setBounds(0,425,40,40); bottom.setBounds(40,425,560,40); // 添加到主界面 add(id); add(connect); add(server); add(nickname); add(edit); add(friendsScroll); add(groupScroll); add(groupInput); add(groupSend); add(privateTitle); add(privateScroll); add(privateInput); add(privateSend); add(bottomIcon); add(bottom); // 主界面设置 setSize(600,500); setResizable(false); setLocationByPlatform(true); setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); setVisible(true); }// 省略所有UI组件的getter方法}
图形组件事件处理EventHandler
主要用于对图形组件注册各种事件处理器。
public class EventHandler { /** * 注册事件 * @param mainUI */ public static void doRegister(MainUI mainUI){ // 给好友列表添加鼠标双击事件 mainUI.getFriends().addMouseListener(new MouseAdapter() { @Override public void mouseClicked(MouseEvent e) { if(e.getClickCount() != 2){ return; } try { Rectangle rec = mainUI.getFriends().modelToView(mainUI.getFriends().getCaretPosition()); //height为当前鼠标所点击的行数(第几行) int height=rec.y / rec.height + 1; //获取点击行起始处的偏移量。 int start=mainUI.getFriends().getLineStartOffset(height-1); //获取点击行结尾处的偏移量。 int end = mainUI.getFriends().getLineEndOffset(height-1); //获取偏移量之间的文本,即为JTextarea上该行的文本 String text=mainUI.getFriends().getText(start, end-start-2); // 修改私聊窗口标题 UIHandler.setText(mainUI.getPrivateTitle(),"私聊窗口["+text+"]"); // 获取好友ID String subject = text.substring(0, text.indexOf("(")); // 获取好友对应的FriendMsg对象 FriendMsg friendMsg = IOHandler.getHandler().getFriendsData().get(subject); // 回显与该好友的历史消息 UIHandler.setText(mainUI.getPrivateArea(),friendMsg.getMsgList().stream().map(item->item+"\n").collect(Collectors.joining())); // 重置未读消息个数 friendMsg.setUnreadCount(0); // 重新加载好友列表 UIHandler.reloadFriends(mainUI); }catch (Exception ex){ ex.printStackTrace(); } } }); // 给连接服务器按钮注册点击事件 mainUI.getConnect().addActionListener(e->{ new Thread(()->{ try { // 连接服务端 IOHandler.getHandler().doConnect(mainUI); } catch (IOException ex) { ex.printStackTrace(); } }).start(); }); // 给修改昵称按钮注册点击事件 mainUI.getEdit().addActionListener(e->{ String text = ((JButton) e.getSource()).getText(); if("修改昵称".equals(text)){ UIHandler.setText(e.getSource(),"保存"); UIHandler.setEditable(mainUI.getNickname(),true); }else if("保存".equals(text)){ UIHandler.setText(e.getSource(),"修改昵称"); UIHandler.setEditable(mainUI.getNickname(),false); // 成功修改昵称,发送消息到服务端,由服务端转发给其它客户端 IOHandler.getHandler().writeMsg(CommonConstant.MSG_TYPE_MODIFY_NAME,null,mainUI.getNickname().getText()); } }); // 给群聊消息发送按钮添加单机事件 mainUI.getGroupSend().addActionListener(e->{ if(null == IOHandler.getHandler().getSelfId()){ UIHandler.alert(mainUI,"当前未连接服务器,不能发送消息","提示",JOptionPane.WARNING_MESSAGE); return; } if(mainUI.getGroupInput().getText().length()<1){ UIHandler.alert(mainUI,"不能发送空白消息","提示",JOptionPane.WARNING_MESSAGE); return; } // 将群聊消息发给服务器,由服务器转发给其它客户端 IOHandler.getHandler().writeMsg(CommonConstant.MSG_TYPE_SEND_GROUP,null,mainUI.getGroupInput().getText()); // 群聊界面追加刚刚发出的消息 UIHandler.append(mainUI.getGroupArea(),"自己:"+mainUI.getGroupInput().getText()); // 输入框重置 UIHandler.setText(mainUI.getGroupInput(),""); }); // 给私聊消息发送按钮添加单机事件 mainUI.getPrivateSend().addActionListener(e->{ if(null == IOHandler.getHandler().getSelfId()){ UIHandler.alert(mainUI,"当前未连接服务器,不能发送消息","提示",JOptionPane.WARNING_MESSAGE); return; } if(mainUI.getPrivateInput().getText().length()<1){ UIHandler.alert(mainUI,"不能发送空白消息","提示",JOptionPane.WARNING_MESSAGE); return; } if("私聊窗口".equals(mainUI.getPrivateTitle().getText())){ UIHandler.alert(mainUI,"请双击一个好友后再发送私聊消息","提示",JOptionPane.WARNING_MESSAGE); return; } if(mainUI.getPrivateTitle().getText().contains("已下线")){ UIHandler.alert(mainUI,"好友已下线","提示",JOptionPane.WARNING_MESSAGE); return; } // 获取好友ID String subject = mainUI.getPrivateTitle().getText().substring(mainUI.getPrivateTitle().getText().indexOf("[")+1, mainUI.getPrivateTitle().getText().indexOf("(")); // 将群聊消息发给服务器,由服务器转发给指定客户端 IOHandler.getHandler().writeMsg(CommonConstant.MSG_TYPE_SEND_PRIVATE,subject,mainUI.getPrivateInput().getText()); // 私聊界面追加刚刚发送的消息 UIHandler.append(mainUI.getPrivateArea(),"自己:"+mainUI.getPrivateInput().getText()); // 将消息存储到FriendMsg对象中 IOHandler.getHandler().getFriendsData().get(subject).getMsgList().add("自己:"+mainUI.getPrivateInput().getText()); // 重置私聊输入框 UIHandler.setText(mainUI.getPrivateInput(),""); }); }}
NIO通信处理IOHandler
用于处理客户端发送消息至服务端,以及接收从服务端发来的消息等。客户端这边也是用了NIO模式,将客户端对应的SocketChannel注册到Selector上,并在单独的一个线程里无线循环处理客户端的读事件,并通过UIHandler类的各种方法将处理结果渲染到UI界面,实现界面与数据的联动。
public class IOHandler { private static IOHandler ioHandler = new IOHandler(); private IOHandler(){} public static IOHandler getHandler(){ return ioHandler; } // 存储好友的数据 private Map<String,FriendMsg> friendsData = new LinkedHashMap<>(); // 当前客户端的ID private String selfId; // 当前客户端的SocketChannel private SocketChannel socketChannel; // 读缓冲,以\n为分隔符读取消息,用于解决TCP粘包拆包问题 private ChatBufferReader chatBufferReader = new ChatBufferReader(); // 给消息的最后添加一个分隔符\n private ChatMsgWrapper chatMsgWrapper = new ChatMsgWrapper(); public void doConnect(MainUI mainUI) throws IOException { Selector selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // 获取服务端ip:port try { String[] arr = mainUI.getServer().getText().split(":"); if(arr.length != 2){ throw new RuntimeException("服务端地址格式错误"); } socketChannel.connect(new InetSocketAddress(arr[0], Integer.valueOf(arr[1]))); } catch (Exception e) { e.printStackTrace(); UIHandler.alert(mainUI,e.getMessage(),"提示",JOptionPane.ERROR_MESSAGE); return; } socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, ByteBuffer.allocate(CommonConstant.BUFFER_SIZE)); for (;;){ int num = selector.select(); if(num <= 0){ continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isConnectable()){ while(!socketChannel.finishConnect()){ UIHandler.setText(mainUI.getBottom(),"客户端连接中..."); } selfId = socketChannel.getLocalAddress().toString().substring(1); // 连接成功后,渲染界面 UIHandler.setText(mainUI.getBottom(),"已成功连接服务器"); UIHandler.alert(mainUI,"成功连接服务器","提示",JOptionPane.INFORMATION_MESSAGE); UIHandler.append(mainUI.getGroupArea(),"成功连接服务器,您现在可以发送群聊消息"); UIHandler.setText(mainUI.getId(),selfId); UIHandler.setVisible(mainUI.getConnect(),false); UIHandler.setEditable(mainUI.getServer(),false); }else if (key.isReadable()){ // 缓冲读,以\n为分隔符读取消息,可解决TCP粘包拆包问题 String msg = chatBufferReader.readMsg(key); ChatMsg chatMsg = JSONObject.parseObject(msg, ChatMsg.class); if(CommonConstant.MSG_TYPE_SYNC_TO_NEW_CLIENT == chatMsg.getType()){ // 新客户端(自己)上线,获取服务端发过来的所有好友信息 Map<String,String> map = JSONObject.parseObject(chatMsg.getMsg(), Map.class); map.forEach((subject,name)->{// 初始化FriendMsgfriendsData.put(subject,new FriendMsg(subject,name,0,new ArrayList())); }); // 重新加载好友列表 UIHandler.reloadFriends(mainUI); }else if(CommonConstant.MSG_TYPE_NOTICE_HAS_NEW_CLIENT == chatMsg.getType()){ // 有新的好友上线 UIHandler.setText(mainUI.getBottom(),String.format("有新的好友[%s]已上线",chatMsg.getMsg())); // 初始化FriendMsg friendsData.put(chatMsg.getMsg(),new FriendMsg(chatMsg.getMsg(),"",0,new ArrayList())); // 重新加载好友列表 UIHandler.reloadFriends(mainUI); }else if(CommonConstant.MSG_TYPE_NOTICE_OTHER_CLIENT_MODIFY_NAME == chatMsg.getType()){ // 好友修改了昵称 UIHandler.setText(mainUI.getBottom(), String.format("好友[%s]修改了昵称[%s]",chatMsg.getSubject(),chatMsg.getMsg())); // 修改FriendMsg friendsData.get(chatMsg.getSubject()).setName(chatMsg.getMsg()); // 重新加载好友列表 UIHandler.reloadFriends(mainUI); // 如果正在与该好友聊天,则更新私聊标题 UIHandler.updatePrivateTitle(mainUI,chatMsg.getSubject(),false); }else if(CommonConstant.MSG_TYPE_RECV_GROUP == chatMsg.getType()){ // 接收群聊消息 UIHandler.append(mainUI.getGroupArea(),UIHandler.buildSessionMsg(chatMsg.getSubject(),chatMsg.getMsg())); }else if(CommonConstant.MSG_TYPE_RECV_PRIVATE == chatMsg.getType()){ // 接收私聊消息 FriendMsg friendMsg = friendsData.get(chatMsg.getSubject()); // 将消息记录到friendMsg的msgList中 friendMsg.getMsgList().add(UIHandler.buildSessionMsg(chatMsg.getSubject(),chatMsg.getMsg())); if(mainUI.getPrivateTitle().getText().contains(chatMsg.getSubject())){// 当前正在与该好友聊天,则还要将消息追加到私聊窗口UIHandler.append(mainUI.getPrivateArea(),UIHandler.buildSessionMsg(chatMsg.getSubject(),chatMsg.getMsg())); }else{// 当前没有在于该好友聊天,则未读消息个数加1friendMsg.setUnreadCount(friendMsg.getUnreadCount()+1);// 底部通知栏添加通知UIHandler.setText(mainUI.getBottom(),UIHandler.buildSessionMsg(chatMsg.getSubject(),"发来了一条新消息"));// 重新加载好友列表UIHandler.reloadFriends(mainUI); } }else if(CommonConstant.MSG_TYPE_NOTICE_OTHER_CLIENT_OFFLINE == chatMsg.getType()){ // 好友下线,底部通知栏添加通知 UIHandler.setText(mainUI.getBottom(),UIHandler.buildSessionMsg(chatMsg.getSubject(),"下线")); // 如果正在与该好友聊天,则更新私聊标题 UIHandler.updatePrivateTitle(mainUI,chatMsg.getSubject(),true); // 移除 friendsData.remove(chatMsg.getSubject()); // 重新加载好友列表 UIHandler.reloadFriends(mainUI); } } // 最后移除此次发生处理的selectionKey,防止事件重复处理 iterator.remove(); } } } /** * 写数据,chatMsgWrapper实现了自定义消息格式(\n) */ public void writeMsg(int type,String subject,String text){ try { socketChannel.write(chatMsgWrapper.wrap(new ChatMsg(type,subject,text))); } catch (IOException e) { e.printStackTrace(); } } public String getSelfId() { return selfId; } public Map<String, FriendMsg> getFriendsData() { return friendsData; }}
服务端核心代码分析
入口ChatServerBootstrap
即main方法所在处,可随意修改绑定的地址与端口号,但也要对应修改客户端连接时指定的服务端地址。
public class ChatServerBootstrap { public static void main(String[] args) throws IOException { new ServiceHandler().handle("localhost",6666); }}
NIO通信处理ServiceHandler
public class ServiceHandler { /** * 存储客户端昵称 */ private Map<String,String> clientNameMap = new HashMap(); // 读缓冲,以\n为分隔符读取消息,用于解决TCP粘包拆包问题 private ChatBufferReader chatBufferReader = new ChatBufferReader(); // 给消息的最后添加一个分隔符\n private ChatMsgWrapper chatMsgWrapper = new ChatMsgWrapper(); public void handle(String ip,int port) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(ip,port)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("聊天服务器已就绪..."); for(;;){ selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if(selectionKey.isAcceptable()){ SocketChannel newSocketChannel = serverSocketChannel.accept(); newSocketChannel.configureBlocking(false); // 获取新客户端id String id = newSocketChannel.getRemoteAddress().toString().substring(1); System.out.println(String.format("客户端上线[%s]",id)); // 先通知其他所有人,有新的客户端上线 for (SelectionKey otherKey : selector.keys()) { SelectableChannel selectableChannel = otherKey.channel(); if(selectableChannel instanceof SocketChannel && selectableChannel != newSocketChannel){ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment();otherAtt.clear();otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_NOTICE_HAS_NEW_CLIENT,null,id)));// 触发write事件otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE); } } // 向新客户端同步在线好友 ByteBuffer att = ByteBuffer.allocate(CommonConstant.BUFFER_SIZE); att.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_SYNC_TO_NEW_CLIENT,null,JSONObject.toJSONString(clientNameMap)))); newSocketChannel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE,att); // 初始化新客户端的昵称 clientNameMap.put(id,""); }else if(selectionKey.isReadable()){ SocketChannel channel = (SocketChannel)selectionKey.channel(); String msg = null; try { msg = chatBufferReader.readMsg(selectionKey); } catch (IOException e) { // 通知所有客户端下线 clientNameMap.remove(e.getMessage()); for (SelectionKey otherKey : selector.keys()) {SelectableChannel selectableChannel = otherKey.channel();if(selectableChannel instanceof SocketChannel && selectableChannel != channel){ ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment(); otherAtt.clear(); otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_NOTICE_OTHER_CLIENT_OFFLINE,e.getMessage(),e.getMessage()))); // 触发write事件 otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE);} } iterator.remove(); continue; } ChatMsg chatMsg = JSONObject.parseObject(msg, ChatMsg.class); String id = channel.getRemoteAddress().toString().substring(1); if(CommonConstant.MSG_TYPE_MODIFY_NAME == chatMsg.getType()){ // 通知其他人有好友修改了昵称 clientNameMap.put(id,chatMsg.getMsg()); for (SelectionKey otherKey : selector.keys()) {SelectableChannel selectableChannel = otherKey.channel();if(selectableChannel instanceof SocketChannel && selectableChannel != channel){ ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment(); otherAtt.clear(); otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_NOTICE_OTHER_CLIENT_MODIFY_NAME,id,chatMsg.getMsg()))); // 触发write事件 otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE);} } }else if(CommonConstant.MSG_TYPE_SEND_GROUP == chatMsg.getType()){ // 转发群聊消息 for (SelectionKey otherKey : selector.keys()) {SelectableChannel selectableChannel = otherKey.channel();if(selectableChannel instanceof SocketChannel && selectableChannel != channel){ ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment(); otherAtt.clear(); otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_RECV_GROUP,id,chatMsg.getMsg()))); // 触发write事件 otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE);} } }else if(CommonConstant.MSG_TYPE_SEND_PRIVATE == chatMsg.getType()){ // 转发私聊消息 for (SelectionKey otherKey : selector.keys()) {SelectableChannel selectableChannel = otherKey.channel();if(selectableChannel instanceof SocketChannel && selectableChannel != channel && ((SocketChannel) selectableChannel).getRemoteAddress().toString().substring(1).equals(chatMsg.getSubject())){ ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment(); otherAtt.clear(); otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_RECV_PRIVATE,id,chatMsg.getMsg()))); // 触发write事件 otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE); break;} } } }else if(selectionKey.isWritable()){ // 写事件 SocketChannel channel = (SocketChannel)selectionKey.channel(); ByteBuffer att = (ByteBuffer) selectionKey.attachment(); att.flip(); channel.write(att); selectionKey.interestOps(SelectionKey.OP_READ); } // 最后移除此次发生处理的selectionKey,防止事件重复处理 iterator.remove(); } } }}
专题
如何解决TCP粘拆包问题?
解决此类问题,有许多的方法可以使用,如消息定长(Long类型消息固定为8个字节、Int类型消息固定为4个字节、随机大小如200字节不够则空格)、分隔符(\n、\r\n、其它自定义分隔符)等。
本项目基于分隔符\n的方案,实现消息的读取与写入。
消息的写入在ChatMsgWrapper.java类中,如下代码所示。
public class ChatMsgWrapper { public ByteBuffer wrap(ChatMsg chatMsg){ String jsonString = JSONObject.toJSONString(chatMsg); return ByteBuffer.wrap((jsonString+"\n").getBytes(StandardCharsets.UTF_8)); }}
由此可见,通过wrap方法,会在每个原生消息chatMsg的后面追加上一个换行符。
那怎么读取呢?别着急,消息的读取在ChatBufferReader.java类中,如下代码所示。
public class ChatBufferReader { /** * 缓冲区 */ private Map<SocketChannel,String> msgBuffer = new HashMap(); public String readMsg(SelectionKey selectionKey) throws IOException { SocketChannel channel = (SocketChannel)selectionKey.channel(); ByteBuffer buffer = (ByteBuffer)selectionKey.attachment(); StringBuilder sb = new StringBuilder(); try { String msg = ""; // 循环读,读到有\n截止 while (!msg.contains("\n")){ buffer.clear(); channel.read(buffer); msg = new String(buffer.array(), 0, buffer.position(), StandardCharsets.UTF_8); sb.append(msg); } } catch (IOException e) { String host = channel.getRemoteAddress().toString().substring(1); System.out.println(String.format("远程机器下线[%s]",host)); selectionKey.cancel(); channel.close(); throw new IOException(host); } // 此次读到的消息 String message = sb.toString(); // 将缓冲区中上次读到的消息放到前面来 if (msgBuffer.containsKey(channel) && null != msgBuffer.get(channel)){ message = msgBuffer.get(channel)+message; } // 这里取第一个\n前面的数据作为本次读取的消息,后面的按原样放到缓冲区 // 放到缓冲区 msgBuffer.put(channel,message.length()<=message.indexOf("\n")+2?"":message.substring(message.indexOf("\n")+2)); // 将本次读取的消息返回出去 return message.substring(0,message.indexOf("\n")); }}
通过while (!msg.contains("\n"))循环,我们可能会读好几次,直到读到的数据有换行符为止,为什么呢?因为换行符代表一个消息的结束,如果没有换行符就代表还没有读到消息的末尾,那就肯定还要继续读啊。
读完了之后,怎么处理呢?由于TCP粘包机制,因此读到的数据可能会长这样:abcdefg\nhijk,\n代表了一个消息的结束,那么abcdefg就是一条完整的消息,但hijk是属于下一条消息的,只不过被粘在一起了,我们需要做的是将hijk放到缓冲区缓存起来,然后将abcdefg返回出去(代表本次读到的消息)。
缓存起来怎么办呢?难道就不管了吗?当然不是!当我们尝试读下一条消息时,读到的数据可能是这样的:lmn\nopq,通过上面的论述,我们已经知道了opq是属于下一条消息的,要放缓冲区里,但lmn就是一条完整的消息吗?当然不是,lmn是消息的下半部分,那上半部分在哪?在缓冲区呀!
这就是缓冲区的作用,缓冲区可以将消息的上半部分先缓存起来,等到消息的下半部分也读到了,再组合在一块儿,就是一条完整的消息了。
服务端和客户端通信的消息类型有很多种,如何区分?
这个好办,用一个实体类就能搞定。
ChatMsg.java类定义如下。
public class ChatMsg { private int type; private String subject; private String msg; public ChatMsg() { } public ChatMsg(int type, String subject, String msg) { this.type = type; this.subject = subject; this.msg = msg; }// 省略getter/setter方法}
ChatMsg就是封装的服务端与客户端之间通信的消息对象,并且通过type属性区分消息类型,消息类型定义在CommonConstant.java类中。
public interface CommonConstant { /** * 0 向新客户端同步在线好友 * 1 通知有新客户端上线 * 2 修改昵称 * 3 通知其它人修改昵称 * 4 发送群聊消息 * 5 转发群聊消息 * 6 发送私聊消息 * 7 转发私聊消息 * 8 通知有客户端下线 */ int MSG_TYPE_SYNC_TO_NEW_CLIENT = 0; // server->client int MSG_TYPE_NOTICE_HAS_NEW_CLIENT = 1; // server->client int MSG_TYPE_MODIFY_NAME = 2; // client->server int MSG_TYPE_NOTICE_OTHER_CLIENT_MODIFY_NAME = 3; // server->client int MSG_TYPE_SEND_GROUP = 4; // client->server int MSG_TYPE_RECV_GROUP = 5; // server->client int MSG_TYPE_SEND_PRIVATE = 6; // client->server int MSG_TYPE_RECV_PRIVATE = 7; // server->client int MSG_TYPE_NOTICE_OTHER_CLIENT_OFFLINE = 8; // server->client}
到时候,无论是客户端读到了服务端发来的消息,还是服务端读到了客户端发来的消息,都可以先将消息反序列化为ChatMsg对象,然后拿到type属性做if判断,确定本次要处理的是何种业务。
写事件有什么用?我直接写不行嘛,为啥还需要等到写事件发生才能写呢?
在ServiceHandler.java类中,会处理客户端发来的消息,比如客户端A发来群聊的消息,那么服务端需要将该消息写给除客户端A之外的所有客户端,即消息的转发。
那么在这里,我们服务端不是直接写数据的,而是将数据先放到缓冲区里,然后触发一个写事件,在写事件发生时才将数据写数据。
群聊业务处理代码如下所示。
// 转发群聊消息for (SelectionKey otherKey : selector.keys()) { SelectableChannel selectableChannel = otherKey.channel(); if(selectableChannel instanceof SocketChannel && selectableChannel != channel){ ByteBuffer otherAtt = (ByteBuffer) otherKey.attachment(); otherAtt.clear(); otherAtt.put(chatMsgWrapper.wrap(new ChatMsg(CommonConstant.MSG_TYPE_RECV_GROUP,id,chatMsg.getMsg()))); // 触发write事件 otherKey.interestOps(otherKey.interestOps() | SelectionKey.OP_WRITE); }}
处理写事件代码如下所示。
if(selectionKey.isWritable()){ // 写事件 SocketChannel channel = (SocketChannel)selectionKey.channel(); ByteBuffer att = (ByteBuffer) selectionKey.attachment(); att.flip(); channel.write(att); selectionKey.interestOps(SelectionKey.OP_READ);}
那么不直接写,而是等写事件发生才写,这么做的好处是什么呢?
写事件的作用:没有写事件也可以,直接将用户buffer数据拷贝到socket发送缓冲区,但是高并发情况下(用户buffer频繁往socket buffer拷贝数据)以及网络环境很差的情况下(socket 发送缓冲区将数据发出去的速度很慢),socket发送缓冲区很快就满了,这样最终会导致CPU利用率100%。
因此可以用写事件优化,当socket发送缓冲区没有满时,即有空闲会触发可写事件,此时才去写数据,而当满了的时候,就不会触发可写事件,这样能让CPU歇一歇。
另外,在写事件处理结束之后,一定要调用一下selectionKey.interestOps(SelectionKey.OP_READ)取消写事件,如果不调用则selectionKey.isWritable()一直会返回true,而实际并没有可写的数据,这样会使CPU空转。
私聊记录如何保存?
IOHandler.java中有一个属性,如下所示。
private Map<String,FriendMsg> friendsData = new LinkedHashMap<>();
该属性用于保存客户端的好友信息。Map的key为好友的id(ip:port),而FriendMsg如下所示。
/** * 存储与好友聊天信息 */public class FriendMsg { /** * 好友的id(ip:port) */ private String subject; /** * 好友的昵称 */ private String name; /** * 未读消息个数 */ private int unreadCount; /** * 存储我与该好友的聊天记录 */ private List<String> msgList; public FriendMsg(){} public FriendMsg(String subject, String name, int unreadCount, List<String> msgList) { this.subject = subject; this.name = name; this.unreadCount = unreadCount; this.msgList = msgList; }// 省略getter/setter方法}
本客户端与对应好友的私聊记录就保存在msgList属性中,需要注意的是私聊记录是保存在内存中的,一旦好友下线或本客户端下线(进程终止),私聊记录都会丢失。