spring Boot 中搭建neety框架 亲测有效
第一步:
在maven 中添加 neety 中的jar 包
io.netty netty-all 5.0.0.Alpha2
第二步
代码
netty 启动类
package com.jeesite.modules.websocket.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.nio.charset.Charset;/** * @Author:lichunhua * @Date: 2021-12-17 12:00 */@Data@Slf4j@Componentpublic class NettyServer { private EventLoopGroup boosGroup = new NioEventLoopGroup(); private EventLoopGroup workerGroup = new NioEventLoopGroup(); private Channel channel; @Autowired private ChannelInitializer initializer;// public ChannelFuture init(int port) {// ChannelFuture f=null;// try {// //用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度// ServerBootstrap b = new ServerBootstrap();// b.group(boosGroup, workerGroup)// //对应JDK NIO类库中的ServerSocketChannel// .channel(NioServerSocketChannel.class)// //配置NioServerSocketChannel的TCP参数// .option(ChannelOption.SO_BACKLOG, 1024)// //绑定I/O的事件处理类// .childHandler(initializer);// //调用它的bind操作监听端口号,调用同步阻塞方法sync等待绑定操作完成// f = b.bind(port).sync();// channel = f.channel();// } catch (InterruptedException e) {// log.error(e.getMessage());// }// return f;// }public ChannelFuture bind(int port) throws InterruptedException{ EventLoopGroup bossGruop=new NioEventLoopGroup();//用于服务器端接受客户端的连接 EventLoopGroup workGroup=new NioEventLoopGroup();//用于网络事件的处理 ChannelFuture f=null; try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGruop, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new MyServerHandler()); arg0.pipeline().addLast(new StringEncoder(Charset.forName("GBK"))); arg0.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); } }).option(ChannelOption.SO_BACKLOG, 9999);//指定此套接口排队的最大连接个数 f=b.bind(port).sync(); //监听端口号 f.channel().closeFuture().sync(); return f; } finally { bossGruop.shutdownGracefully(); workGroup.shutdownGracefully(); }} public void close() { if (null == channel) { return; } channel.close(); boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}
第二步:
事件处理类
package com.jeesite.modules.websocket.netty;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * @Author:lichunhua * @Date: 2021-12-17 13:36 */@Slf4j@Componentpublic class MyServerChannelInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("=========有客户端连接服务器========="); log.info("ip:" + socketChannel.localAddress().getHostString() + "port:" + socketChannel.localAddress().getPort()); // 基于换行符号 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new MyServerHandler()); }}
第三步
事件操作类
package com.jeesite.modules.websocket.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.string.StringDecoder;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import java.io.UnsupportedEncodingException;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;/** * @Author:lichunhua * @Date: 2021-12-17 13:37 */@Slf4jpublic class MyServerHandler extends ChannelInboundHandlerAdapter { /** * 当客户端主动连接服务端,通道活跃后触发 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //在接收到客户端连接的时候通知客户端连接成功 String msg = "与服务端建立连接成功" + new Date(); ByteBuf buf = Unpooled.buffer(msg.getBytes().length); ctx.pipeline().read(); //ctx.pipeline().fireChannelRead("9999999"); buf.writeBytes(msg.getBytes(CharsetUtil.UTF_8)); } // @Override// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//// channelRead(ctx); if(1==1){ System.out.println("输出数据"); }// ctx.channel().read();// ctx.fireChannelReadComplete(); log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "接收到消息:"); log.info(msg.toString()); ctx.writeAndFlush(msg);// } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ctx.flush(); } /** * 通道有消息触发 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// if(1==1){// System.out.println("通道有消息");// } ctx.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "接收到消息:"); log.info(msg.toString()); ByteBuf buf = (ByteBuf) msg; System.out.println("输出方法后的数据"+ convertByteBufToString(buf) ); byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"GBK"); String name = new String(body.getBytes("ISO8859-1"), "UTF-8"); System.out.println("输出数据" + name); //输出数据 System.out.println("输出字节数据" +getSendByteBuf(name)); System.out.println("接收发送消息" + body); ctx.writeAndFlush(msg); } /** * 当客户端主动断开连接,通道不活跃触发 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("===================客户端:" + ctx.channel().localAddress().toString() + " 断开连接==================="); } /** * 当连接发生异常时触发 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //在发生异常时主动关掉连接 ctx.close(); log.error("发现异常:\r\n" + cause.toString()); } private ByteBuf getSendByteBuf(String message) throws UnsupportedEncodingException { byte[] req = message.getBytes("UTF-8"); ByteBuf pingMessage = Unpooled.buffer(); pingMessage.writeBytes(req); return pingMessage; } public String convertByteBufToString(ByteBuf buf) { String str; if (buf.hasArray()) { // 处理堆缓冲区 str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes()); } else { // 处理直接缓冲区以及复合缓冲区 byte[] bytes = new byte[buf.readableBytes()]; buf.getBytes(buf.readerIndex(), bytes); str = new String(bytes, 0, buf.readableBytes()); } return str; }}
第三步
事件启动类
package com.jeesite.modules.websocket.netty;import io.netty.channel.ChannelFuture;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Author:lichunhua * @Date: 2021-12-17 13:50 */@Slf4j@RestController@RequestMapping("/netty")public class NettyController { @Value("${netty.port}") private int port; @Autowired private NettyServer nettyServer; @RequestMapping("/localAddress") public String localAddress() { return "tcp服务端地址:" + nettyServer.getChannel().localAddress(); } @RequestMapping("/isOpen") public String isOpen() throws InterruptedException { log.info("启动tcp无法开始=================0==========="); log.info("启动tcp无法开始=================端口==========="+port); ChannelFuture init = nettyServer.bind(port); log.info("已经执行这段代码的结束=======1========="); Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.close())); log.info("已经执行这段代码的结束========2========"); init.channel().closeFuture().syncUninterruptibly(); log.info("已经执行这段代码的结束========3========"); if (nettyServer.getChannel().isOpen()){ return "tcp服务已启动"; }else { return "tcp服务未启动"; } } @RequestMapping("/close") public String close() { if (nettyServer.getChannel().isOpen()) { nettyServer.close(); } return "服务端关闭成功"; }}
第四步
在application.yml 添加此配置
netty: port: 1100
如果你想在项目启动就时候启动 neety 不手动启动的话 如下代码中 加入启动类
@SpringBootApplicationpublic class NettyApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(NettyApplication.class, args); } @Value("${netty.port}") private int port; @Autowired private NettyServer nettyServer; @Override public void run(String... args) throws Exception { ChannelFuture init = nettyServer.init(port); Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.close())); init.channel().closeFuture().syncUninterruptibly(); }}