Netty实现单通道并发读写,即多路复用
🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,
15年
工作经验,精通Java编程
,高并发设计
,Springboot和微服务
,熟悉Linux
,ESXI虚拟化
以及云原生Docker和K8s
,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea
Netty实现单通道并发读写,即多路复用
引言:Netty共享通道连接池——突破传统连接模型的性能革命
在传统网络编程中,TCP连接常被视为单线程独占资源,这种设计在高并发场景下面临着严峻的性能瓶颈:每个连接只能串行处理请求,导致资源利用率低下,连接数量激增带来巨大开销。Netty共享通道连接池应运而生,它颠覆性地实现了单TCP连接的多线程并行读写,将连接复用提升到全新维度。
下面我们将实现一个高性能的连接池,支持多个线程共享同一个通道(每个通道最大共享线程数可配置),并确保高并发获取和释放连接的效率。
一、共享连接池实现:支持多线程共享同一通道
核心实现
import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.util.concurrent.*;import java.util.Queue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.LockSupport;public class MultiThreadSharedChannelPool { // 配置参数 private final int maxConnections; private final int maxThreadsPerChannel; private final long acquireTimeoutMillis; // 核心数据结构 private final Queue<SharedChannel> availableChannels = new ConcurrentLinkedQueue<>(); private final ConcurrentHashMap<Channel, SharedChannel> allChannels = new ConcurrentHashMap<>(); private final AtomicInteger totalConnections = new AtomicInteger(0); private final EventLoopGroup eventLoopGroup; private final Bootstrap bootstrap; // 等待队列管理 private final Queue<Promise<SharedChannel>> pendingAcquires = new ConcurrentLinkedQueue<>(); public MultiThreadSharedChannelPool(Bootstrap bootstrap, int maxConnections, int maxThreadsPerChannel, long acquireTimeoutMillis) { this.bootstrap = bootstrap; this.eventLoopGroup = bootstrap.config().group(); this.maxConnections = maxConnections; this.maxThreadsPerChannel = maxThreadsPerChannel; this.acquireTimeoutMillis = acquireTimeoutMillis; } /** * 获取共享通道(异步) */ public Future<SharedChannel> acquire() { Promise<SharedChannel> promise = eventLoopGroup.next().newPromise(); acquireInternal(promise); return promise; } private void acquireInternal(Promise<SharedChannel> promise) { // 尝试从可用通道获取 SharedChannel channel = tryAcquireAvailableChannel(); if (channel != null) { promise.setSuccess(channel); return; } // 尝试创建新连接 if (tryCreateNewConnection()) { // 创建成功后重新尝试获取 eventLoopGroup.schedule(() -> acquireInternal(promise), 10, TimeUnit.MILLISECONDS); return; } // 加入等待队列 if (acquireTimeoutMillis > 0) { scheduleAcquireTimeout(promise); } pendingAcquires.offer(promise); } private SharedChannel tryAcquireAvailableChannel() { for (SharedChannel channel : availableChannels) { if (channel.tryAcquire()) { // 如果通道已满,从可用队列移除 if (!channel.isAvailable()) { availableChannels.remove(channel); } return channel; } } return null; } private boolean tryCreateNewConnection() { if (totalConnections.get() >= maxConnections) { return false; } if (!totalConnections.compareAndSet(totalConnections.get(), totalConnections.get() + 1)) { return false; } // 异步创建连接 bootstrap.connect().addListener((ChannelFuture future) -> { if (future.isSuccess()) { Channel ch = future.channel(); SharedChannel sharedChannel = new SharedChannel(ch, maxThreadsPerChannel); allChannels.put(ch, sharedChannel); // 新连接立即可用 availableChannels.offer(sharedChannel); processPendingAcquires(); // 添加关闭监听 ch.closeFuture().addListener(f -> { allChannels.remove(ch); totalConnections.decrementAndGet(); }); } else { totalConnections.decrementAndGet(); } }); return true; } private void processPendingAcquires() { while (!pendingAcquires.isEmpty()) { Promise<SharedChannel> promise = pendingAcquires.poll(); if (promise == null || promise.isDone()) continue; SharedChannel channel = tryAcquireAvailableChannel(); if (channel != null) { promise.setSuccess(channel); } else { pendingAcquires.offer(promise); break; } } } private void scheduleAcquireTimeout(Promise<SharedChannel> promise) { eventLoopGroup.schedule(() -> { if (!promise.isDone() && pendingAcquires.remove(promise)) { promise.tryFailure(new TimeoutException(\"Acquire timeout\")); } }, acquireTimeoutMillis, TimeUnit.MILLISECONDS); } /** * 释放共享通道 */ public void release(SharedChannel channel) { channel.release(); if (channel.isAvailable()) { // 如果变为可用状态,加入可用队列 availableChannels.offer(channel); // 唤醒等待请求 processPendingAcquires(); } } /** * 共享通道包装类 */ public static class SharedChannel { private final Channel physicalChannel; private final AtomicInteger usageCount = new AtomicInteger(0); private final int maxThreads; public SharedChannel(Channel physicalChannel, int maxThreads) { this.physicalChannel = physicalChannel; this.maxThreads = maxThreads; } public Channel getChannel() { return physicalChannel; } public boolean tryAcquire() { while (true) { int current = usageCount.get(); if (current >= maxThreads) return false; if (usageCount.compareAndSet(current, current + 1)) { return true; } } } public void release() { usageCount.decrementAndGet(); } public boolean isAvailable() { return usageCount.get() < maxThreads && physicalChannel.isActive(); } public int getUsageCount() { return usageCount.get(); } } // 统计信息方法 public int getAvailableChannels() { return availableChannels.size(); } public int getActiveChannels() { return allChannels.size() - availableChannels.size(); } public int getTotalConnections() { return totalConnections.get(); } public int getPendingAcquires() { return pendingAcquires.size(); }}
使用示例
// 1. 创建Netty引导Bootstrap bootstrap = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { // 初始化管道 } });// 2. 创建连接池MultiThreadSharedChannelPool pool = new MultiThreadSharedChannelPool( bootstrap, 20, // 最大物理连接数 8, // 每个连接最大共享线程数 5000 // 获取超时时间(毫秒));// 3. 获取共享通道pool.acquire().addListener((Future<MultiThreadSharedChannelPool.SharedChannel> future) -> { if (future.isSuccess()) { MultiThreadSharedChannelPool.SharedChannel sharedChannel = future.getNow(); try { Channel channel = sharedChannel.getChannel(); // 4. 使用通道 channel.writeAndFlush(request).addListener(writeFuture -> { // 处理响应... }); } finally { // 5. 释放通道 pool.release(sharedChannel); } } else { // 处理获取失败 }});// 6. 关闭连接池Runtime.getRuntime().addShutdownHook(new Thread(() -> { // 实际应用中需要更优雅的关闭 bootstrap.config().group().shutdownGracefully();}));
二、关键性能设计
1. 高性能无锁队列
// 使用并发性能最好的队列private final Queue<SharedChannel> availableChannels = new ConcurrentLinkedQueue<>();// 使用CAS操作更新使用计数public boolean tryAcquire() { while (true) { int current = usageCount.get(); if (current >= maxThreads) return false; if (usageCount.compareAndSet(current, current + 1)) { return true; } }}
2. 智能连接创建策略
private boolean tryCreateNewConnection() { // 双重检查避免过度创建 if (totalConnections.get() >= maxConnections) return false; // CAS保证原子性 if (!totalConnections.compareAndSet(current, current + 1)) { return false; } // 异步创建连接 bootstrap.connect().addListener(future -> { if (future.isSuccess()) { // 添加新通道 } else { // 回滚计数 totalConnections.decrementAndGet(); } }); return true;}
3. 高效等待队列处理
private void processPendingAcquires() { while (!pendingAcquires.isEmpty()) { Promise<SharedChannel> promise = pendingAcquires.poll(); if (promise == null || promise.isDone()) continue; SharedChannel channel = tryAcquireAvailableChannel(); if (channel != null) { promise.setSuccess(channel); } else { // 放回队列并退出循环 pendingAcquires.offer(promise); break; } }}
4. 连接预热机制
public void warmup(int connections) { for (int i = 0; i < Math.min(connections, maxConnections); i++) { tryCreateNewConnection(); }}
三、高级功能扩展
1. 连接健康检查
public void startHealthCheck(long interval, TimeUnit unit) { eventLoopGroup.scheduleAtFixedRate(() -> { for (SharedChannel sc : allChannels.values()) { if (!sc.getChannel().isActive() && sc.getUsageCount() == 0) { // 关闭无效连接 sc.getChannel().close(); } } }, interval, interval, unit);}
2. 负载监控
public void startMonitoring() { eventLoopGroup.scheduleAtFixedRate(() -> { System.out.println(\"=== 连接池状态 ===\"); System.out.println(\"总连接数: \" + totalConnections.get()); System.out.println(\"可用通道: \" + availableChannels.size()); System.out.println(\"等待请求: \" + pendingAcquires.size()); // 打印每个通道的使用情况 allChannels.forEach((ch, sc) -> { System.out.printf(\"通道 %s: 使用数=%d/%d%n\", ch.id(), sc.getUsageCount(), sc.maxThreads); }); }, 5, 5, TimeUnit.SECONDS);}
3. 动态配置
public void updateConfig(int newMaxConnections, int newMaxThreadsPerChannel) { // 注意:需要线程安全地更新 this.maxConnections = newMaxConnections; // 更新现有通道的最大线程数 allChannels.values().forEach(sc -> sc.maxThreads = newMaxThreadsPerChannel);}
四、粘包半包问题解决方案
1. 为什么需要 LengthFieldBasedFrameDecoder
在 TCP 网络通信中,数据是以字节流形式传输的,没有明确的消息边界。这会导致两个核心问题:
-
粘包问题:多个小数据包被合并成一个大数据包
发送端: [包1][包2][包3]接收端: [包1包2包3] (合并接收)
-
拆包问题:大数据包被拆分成多个小数据包
发送端: [大数据包]接收端: [部分1][部分2] (分次接收)
LengthFieldBasedFrameDecoder
正是 Netty 为解决这些问题提供的核心解码器,它通过长度字段来标识消息边界。
2. 工作流程
#mermaid-svg-2bFFT84D8Ro8WAMW {font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW .error-icon{fill:#552222;}#mermaid-svg-2bFFT84D8Ro8WAMW .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-2bFFT84D8Ro8WAMW .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-2bFFT84D8Ro8WAMW .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-2bFFT84D8Ro8WAMW .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-2bFFT84D8Ro8WAMW .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-2bFFT84D8Ro8WAMW .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-2bFFT84D8Ro8WAMW .marker{fill:#333333;stroke:#333333;}#mermaid-svg-2bFFT84D8Ro8WAMW .marker.cross{stroke:#333333;}#mermaid-svg-2bFFT84D8Ro8WAMW svg{font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-2bFFT84D8Ro8WAMW .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-2bFFT84D8Ro8WAMW text.actor>tspan{fill:black;stroke:none;}#mermaid-svg-2bFFT84D8Ro8WAMW .actor-line{stroke:grey;}#mermaid-svg-2bFFT84D8Ro8WAMW .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW .sequenceNumber{fill:white;}#mermaid-svg-2bFFT84D8Ro8WAMW #sequencenumber{fill:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW .messageText{fill:#333;stroke:#333;}#mermaid-svg-2bFFT84D8Ro8WAMW .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-2bFFT84D8Ro8WAMW .labelText,#mermaid-svg-2bFFT84D8Ro8WAMW .labelText>tspan{fill:black;stroke:none;}#mermaid-svg-2bFFT84D8Ro8WAMW .loopText,#mermaid-svg-2bFFT84D8Ro8WAMW .loopText>tspan{fill:black;stroke:none;}#mermaid-svg-2bFFT84D8Ro8WAMW .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-2bFFT84D8Ro8WAMW .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-2bFFT84D8Ro8WAMW .noteText,#mermaid-svg-2bFFT84D8Ro8WAMW .noteText>tspan{fill:black;stroke:none;}#mermaid-svg-2bFFT84D8Ro8WAMW .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-2bFFT84D8Ro8WAMW .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-2bFFT84D8Ro8WAMW .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-2bFFT84D8Ro8WAMW .actorPopupMenu{position:absolute;}#mermaid-svg-2bFFT84D8Ro8WAMW .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-2bFFT84D8Ro8WAMW .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-2bFFT84D8Ro8WAMW .actor-man circle,#mermaid-svg-2bFFT84D8Ro8WAMW line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-2bFFT84D8Ro8WAMW :root{--mermaid-font-family:\"trebuchet ms\",verdana,arial,sans-serif;} Channel Decoder Handler 接收原始字节流 读取长度字段值 (N) 计算帧长度 = N + 调整值 等待直到收到完整帧 (N字节) 转发完整数据帧 (剥离指定头部) Channel Decoder Handler
3. 核心参数详解
3.1 构造函数
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
maxFrameLength
lengthFieldOffset
lengthFieldLength
lengthAdjustment
initialBytesToStrip
3.2 参数关系公式
完整帧长度 = lengthFieldOffset + lengthFieldLength + (长度字段值) + lengthAdjustment
4. 典型使用场景
场景1:简单长度前缀协议
[长度字段(4字节)][消息体]
new LengthFieldBasedFrameDecoder( 1048576, // 1MB最大帧 0, // 长度字段在开头 4, // 长度字段占4字节 0, // 无调整 4 // 剥离长度字段)
场景2:包含固定头部的协议
[魔数(2字节)][版本(1字节)][长度(4字节)][消息体]
new LengthFieldBasedFrameDecoder( 1048576, 2 + 1, // 跳过魔数和版本 (3字节) 4, // 长度字段4字节 0, // 无调整 2 + 1 + 4 // 剥离魔数+版本+长度字段 (7字节))
场景3:长度包含自身的情况
[长度(4字节)][消息体] // 长度字段值 = 4 + 消息体长度
new LengthFieldBasedFrameDecoder( 1048576, 0, 4, -4, // 调整:长度字段包含自身,需减去4 4 // 剥离长度字段)
场景4:复杂调整场景
[头标识(2)][长度(4)][版本(1)][消息体][校验(2)]
new LengthFieldBasedFrameDecoder( 1048576, 2, // 跳过头标识 4, // 长度字段4字节 1 + 2, // 调整:长度字段值 + 版本长度 + 校验长度 2 + 4 // 剥离头标识+长度字段)
5. 完整代码示例
5.1 服务端配置
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 配置LengthFieldBasedFrameDecoder pipeline.addLast(new LengthFieldBasedFrameDecoder( 1024 * 1024, // 最大帧1MB 0, // 长度字段偏移0 4, // 长度字段4字节 0, // 长度调整值 4 // 剥离前4字节(长度字段) )); // 添加自定义解码器 pipeline.addLast(new MessageDecoder()); // 添加业务处理器 pipeline.addLast(new BusinessHandler()); }}
5.2 自定义消息解码器
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { // 此时msg已经是完整帧(已剥离长度字段) int type = msg.readByte(); // 读取消息类型 byte[] payload = new byte[msg.readableBytes()]; msg.readBytes(payload); CustomMessage message = new CustomMessage(type, payload); out.add(message); }}
5.3 自定义消息类
public class CustomMessage { private final int type; private final byte[] payload; public CustomMessage(int type, byte[] payload) { this.type = type; this.payload = payload; } // 编码方法(用于客户端) public ByteBuf encode() { ByteBuf buf = Unpooled.buffer(); buf.writeByte(type); buf.writeBytes(payload); // 添加长度前缀 ByteBuf finalBuf = Unpooled.buffer(); finalBuf.writeInt(buf.readableBytes()); finalBuf.writeBytes(buf); return finalBuf; }}
6.高级配置技巧
6.1 字节序控制
new LengthFieldBasedFrameDecoder( ByteOrder.BIG_ENDIAN, // 大端序 1048576, 0, 4, 0, 4)
6.2 快速失败模式
new LengthFieldBasedFrameDecoder( 1048576, 0, 4, 0, 4) { @Override protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { long frameLength = super.getUnadjustedFrameLength(buf, offset, length, order); if (frameLength < 0) { throw new CorruptedFrameException(\"负长度: \" + frameLength); } return frameLength; }};
6.3结合其他解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 转换为字符串pipeline.addLast(new JsonDecoder()); // JSON解析
7. 常见问题解决方案
问题1:长度字段值包含哪些部分?
解决方案:使用 lengthAdjustment
参数精确控制:
- 若长度字段仅包含消息体:lengthAdjustment = 0
- 若长度字段包含自身和消息体:lengthAdjustment = -长度字段字节数
- 若长度字段包含其他头部:lengthAdjustment = 额外部分的长度
问题2:如何处理变长头部?
// 示例:头部包含变长字段pipeline.addLast(new LengthFieldBasedFrameDecoder( 1048576, 0, 4, -4, // 调整长度 4 // 剥离长度字段));pipeline.addLast(new HeaderDecoder()); // 自定义头部解码器public class HeaderDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int headerLength = in.readByte(); // 读取头部长度 byte[] header = new byte[headerLength]; in.readBytes(header); // 剩余部分为消息体 out.add(new CustomMessage(header, in.retain())); }}
问题3:超大消息处理
// 分块处理超大消息pipeline.addLast(new LengthFieldBasedFrameDecoder( 10 * 1024 * 1024, // 10MB 0, 4, 0, 4));pipeline.addLast(new ChunkedMessageHandler());public class ChunkedMessageHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { while (msg.readableBytes() > 0) { int chunkSize = Math.min(1024 * 64, msg.readableBytes()); ByteBuf chunk = msg.readRetainedSlice(chunkSize); processChunk(chunk); } }}
8. 性能优化
8.1 使用池化ByteBuf
// 在引导中配置Bootstrap b = new Bootstrap();b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
8.2 避免内存复制
// 在解码器中直接使用ByteBufpublic class DirectDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { // 避免复制:直接使用ByteBuf out.add(msg.retain()); }}
8.3 精确控制最大长度
// 根据业务需求设置合理的最大帧长度new LengthFieldBasedFrameDecoder( getConfig().getMaxFrameSize(), // 从配置读取 0, 4, 0, 4)
通过合理配置 LengthFieldBasedFrameDecoder
,您可以高效解决TCP粘包/拆包问题,构建稳定可靠的网络通信系统。
总结
自实现的 MultiThreadSharedChannelPool
提供了:
- 真正的多线程共享:每个物理连接可被多个线程同时使用
- 智能连接分配:优先使用未饱和的连接
- 高效并发控制:无锁队列+CAS原子操作
- 完整生命周期管理:创建、使用、回收、销毁