Netty中future和promise用法和区别
定义与概念
- Future:表示一个异步操作的结果。它是只读的,意味着你只能查看操作是否完成、是否成功、获取结果或者异常等信息,但不能主动设置操作的结果。
- Promise:是
Future
的可写扩展。它不仅可以像Future
一样查看操作结果,还能主动设置操作的成功、失败或者取消状态,并且通知所有的监听器。
用法示例
Future 的用法
Future
通常用于获取异步操作的结果,并且可以添加监听器来处理操作完成后的逻辑。以下是一个简单的示例,展示了如何使用 Future
来处理 DNS 解析结果:
dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() { @Override public void operationComplete(Future<InetAddress> future) throws Exception { if (future.isSuccess()) { InetAddress hostAddress = future.get(); // 处理解析成功的结果 } else { // 处理解析失败的情况 } }});
在这个示例中,dnsNameResolver.resolve(host)
方法返回一个 Future
对象,我们通过添加 FutureListener
来监听解析操作的完成状态。当操作完成后,会调用 operationComplete
方法,我们可以在这个方法中处理解析结果。
Promise 的用法
Promise
主要用于主动设置异步操作的结果,并且可以通知所有的监听器。以下是一个示例,展示了如何使用 Promise
来处理 OCSP 查询结果:
final Promise<OCSPResp> responsePromise = eventLoop.newPromise();// 异步操作dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() { @Override public void operationComplete(Future<InetAddress> future) throws Exception { if (future.isSuccess()) { // 处理解析成功的结果 InetAddress hostAddress = future.get(); final ChannelFuture channelFuture = bootstrap.connect(hostAddress, port); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { // 处理连接成功的结果 responsePromise.trySuccess(result); // 设置操作成功的结果 } else { responsePromise.tryFailure(new IllegalStateException( \"Connection to OCSP Responder Failed\", future.cause())); // 设置操作失败的结果 } } }); } else { responsePromise.tryFailure(future.cause()); // 设置操作失败的结果 } }});// 添加监听器来处理操作结果responsePromise.addListener(new FutureListener<OCSPResp>() { @Override public void operationComplete(Future<OCSPResp> future) throws Exception { if (future.isSuccess()) { OCSPResp resp = future.get(); // 处理操作成功的结果 } else { // 处理操作失败的情况 } }});
在这个示例中,我们首先创建了一个 Promise
对象 responsePromise
,然后在异步操作完成后,根据操作结果调用 trySuccess
或 tryFailure
方法来设置 Promise
的状态。最后,我们添加了一个 FutureListener
来监听 Promise
的完成状态,并处理操作结果。
区别总结
- 可写性:
Future
是只读的,只能查看异步操作的结果,不能主动设置操作的状态。Promise
是可写的,可以主动设置操作的成功、失败或者取消状态。
- 用途:
Future
主要用于获取异步操作的结果,并且可以添加监听器来处理操作完成后的逻辑。Promise
主要用于在异步操作完成后,主动设置操作的结果,并且通知所有的监听器。
- 方法差异:
Future
提供了一些方法来查看操作的状态,如isDone()
、isSuccess()
、cause()
等。Promise
除了继承了Future
的方法外,还提供了一些方法来设置操作的结果,如setSuccess()
、trySuccess()
、setFailure()
、tryFailure()
等。
代码中的体现
在提供的代码片段中,InflightNameResolver
类的 resolve
方法使用了 Promise
来处理 DNS 解析结果:
private <U> Promise<U> resolve( final ConcurrentMap<String, Promise<U>> resolveMap, final String inetHost, final Promise<U> promise, boolean resolveAll) { // ... if (resolveAll) { @SuppressWarnings(\"unchecked\") final Promise<List<T>> castPromise = (Promise<List<T>>) promise; // U is List delegate.resolveAll(inetHost, castPromise); } else { @SuppressWarnings(\"unchecked\") final Promise<T> castPromise = (Promise<T>) promise; // U is T delegate.resolve(inetHost, castPromise); } // ... return promise;}
在这个方法中,我们可以看到 Promise
被用于传递异步操作的结果,并且可以在操作完成后主动设置操作的状态。
另外,PromiseNotifier
类展示了如何使用 Promise
来通知多个监听器:
public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> { private final Promise<? super V>[] promises; public PromiseNotifier(Promise<? super V>... promises) { this.promises = promises; } @Override public void operationComplete(F future) throws Exception { if (future.isSuccess()) { V result = future.get(); for (Promise<? super V> p : promises) { PromiseNotificationUtil.trySuccess(p, result, null); } } else if (future.isCancelled()) { for (Promise<? super V> p : promises) { PromiseNotificationUtil.tryCancel(p, null); } } else { Throwable cause = future.cause(); for (Promise<? super V> p : promises) { PromiseNotificationUtil.tryFailure(p, cause, null); } } }}
在这个类中,我们可以看到 Promise
被用于通知多个监听器操作的结果,并且可以根据操作的状态调用不同的方法来设置 Promise
的状态。
综上所述,Future
和 Promise
在 Netty 中都是非常重要的组件,它们分别用于处理异步操作的不同方面。通过合理使用 Future
和 Promise
,可以有效地处理异步操作的结果,提高代码的可读性和可维护性。
处理多个顺序依赖的异步操作
假设我们需要完成一个包含三个步骤的操作流程:
- 连接到服务器
- 发送认证请求并等待认证成功
- 发送业务数据并接收响应
这三个步骤必须按顺序执行,后一个步骤依赖于前一个步骤的成功完成。以下是实现这种依赖关系的代码示例:
import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class ChannelPromiseDependencyExample { private static final String SERVER_HOST = \"localhost\"; private static final int SERVER_PORT = 8080; public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new StringDecoder(), new StringEncoder(), new ClientHandler() ); } }); // 创建主 Promise,用于跟踪整个操作流程的完成状态 ChannelPromise mainPromise = bootstrap.config().group().next().newPromise(); // 开始执行依赖操作链 connectAndProcess(bootstrap, mainPromise); // 等待整个操作流程完成 mainPromise.await(); if (mainPromise.isSuccess()) { System.out.println(\"所有操作成功完成\"); } else { System.out.println(\"操作失败: \" + mainPromise.cause()); } } finally { group.shutdownGracefully(); } } private static void connectAndProcess(Bootstrap bootstrap, ChannelPromise mainPromise) { // 步骤 1: 连接到服务器 ChannelFuture connectFuture = bootstrap.connect(SERVER_HOST, SERVER_PORT); // 为连接操作添加监听器 connectFuture.addListener((ChannelFuture future) -> { if (future.isSuccess()) { Channel channel = future.channel(); System.out.println(\"成功连接到服务器\"); // 步骤 2: 发送认证请求 ChannelPromise authPromise = channel.newPromise(); sendAuthRequest(channel, authPromise); // 为认证操作添加监听器 authPromise.addListener((ChannelFuture authFuture) -> { if (authFuture.isSuccess()) { System.out.println(\"认证成功\"); // 步骤 3: 发送业务数据 ChannelPromise businessPromise = channel.newPromise(); sendBusinessData(channel, businessPromise); // 为业务操作添加监听器 businessPromise.addListener((ChannelFuture businessFuture) -> { if (businessFuture.isSuccess()) { System.out.println(\"业务数据处理成功\"); mainPromise.setSuccess(); // 标记整个操作成功 } else { mainPromise.setFailure(businessFuture.cause()); // 标记整个操作失败 } channel.close(); // 关闭连接 }); } else { mainPromise.setFailure(authFuture.cause()); // 标记整个操作失败 channel.close(); // 关闭连接 } }); } else { mainPromise.setFailure(future.cause()); // 标记整个操作失败 } }); } private static void sendAuthRequest(Channel channel, ChannelPromise authPromise) { // 发送认证请求 channel.writeAndFlush(\"AUTH username password\").addListener(future -> { if (future.isSuccess()) { System.out.println(\"认证请求已发送\"); // 认证结果将在 ChannelHandler 中处理 } else { authPromise.setFailure(future.cause()); // 认证请求发送失败 } }); } private static void sendBusinessData(Channel channel, ChannelPromise businessPromise) { // 发送业务数据 channel.writeAndFlush(\"DATA some_business_data\").addListener(future -> { if (future.isSuccess()) { System.out.println(\"业务数据已发送\"); // 业务响应将在 ChannelHandler 中处理 } else { businessPromise.setFailure(future.cause()); // 业务数据发送失败 } }); } static class ClientHandler extends SimpleChannelInboundHandler<String> { private ChannelPromise authPromise; private ChannelPromise businessPromise; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通道激活时,可以获取外部的 Promise 实例 // 实际应用中可能需要通过构造函数或其他方式传递 } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(\"收到服务器响应: \" + msg); // 根据响应内容判断操作结果 if (msg.startsWith(\"AUTH_SUCCESS\")) { if (authPromise != null) { authPromise.setSuccess(); // 认证成功 } } else if (msg.startsWith(\"AUTH_FAILURE\")) { if (authPromise != null) { authPromise.setFailure(new Exception(\"认证失败: \" + msg)); // 认证失败 } } else if (msg.startsWith(\"DATA_SUCCESS\")) { if (businessPromise != null) { businessPromise.setSuccess(); // 业务数据处理成功 } } else if (msg.startsWith(\"DATA_FAILURE\")) { if (businessPromise != null) { businessPromise.setFailure(new Exception(\"业务数据处理失败: \" + msg)); // 业务数据处理失败 } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 设置所有未完成的 Promise 为失败状态 if (authPromise != null && !authPromise.isDone()) { authPromise.setFailure(cause); } if (businessPromise != null && !businessPromise.isDone()) { businessPromise.setFailure(cause); } ctx.close(); } }}
关键点解析
- 创建和使用 ChannelPromise:
- 通过
EventLoop.newPromise()
或Channel.newPromise()
创建ChannelPromise
实例。 mainPromise
用于跟踪整个操作流程的完成状态。
- 通过
- 处理依赖关系:
- 使用
addListener()
方法为每个异步操作添加监听器。 - 在前一个操作的监听器中检查操作结果,只有成功时才继续执行下一个操作。
- 如果某个操作失败,立即设置主
Promise
为失败状态并终止后续操作。
- 使用
- 在 ChannelHandler 中处理响应:
- 在
ClientHandler
中接收服务器响应,并根据响应内容设置相应的Promise
状态。 - 这样可以将异步响应与对应的操作关联起来。
- 在
- 异常处理:
- 在
exceptionCaught()
方法中捕获异常,并设置所有未完成的Promise
为失败状态。
- 在
更复杂的依赖关系处理
对于更复杂的依赖关系,可以使用 PromiseCombiner
来组合多个 Promise
,并在所有 Promise
都成功完成后执行后续操作。以下是一个使用 PromiseCombiner
的示例:
import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class PromiseCombinerExample { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(\"收到消息: \" + msg); } } ); } }); // 连接到多个服务器 ChannelFuture future1 = bootstrap.connect(\"server1.example.com\", 8080); ChannelFuture future2 = bootstrap.connect(\"server2.example.com\", 8080); ChannelFuture future3 = bootstrap.connect(\"server3.example.com\", 8080); // 创建 PromiseCombiner 来组合多个 Future PromiseCombiner combiner = new PromiseCombiner(group.next()); combiner.add(future1); combiner.add(future2); combiner.add(future3); // 创建一个 Promise 来接收组合结果 ChannelPromise allConnectedPromise = group.next().newPromise(); combiner.finish(allConnectedPromise); // 为组合结果添加监听器 allConnectedPromise.addListener(future -> { if (future.isSuccess()) { System.out.println(\"所有连接都已成功建立\"); // 执行后续操作 } else { System.out.println(\"至少有一个连接失败: \" + future.cause()); } }); // 等待所有操作完成 allConnectedPromise.await(); } finally { group.shutdownGracefully(); } }}
通过 ChannelPromise
和相关工具,我们可以在 Netty 中灵活处理多个异步操作的依赖关系:
- 顺序依赖:通过在前一个操作的监听器中启动下一个操作,实现顺序执行。
- 并行依赖:使用
PromiseCombiner
等工具组合多个并行操作,等待所有操作完成后执行后续逻辑。 - 异常处理:在每个步骤中正确处理异常,并传播给主
Promise
。 - 状态管理:使用
Promise
跟踪每个操作的状态,确保操作按预期完成。
这种方式使得异步代码更加清晰和易于维护,避免了回调地狱,提高了代码的可读性和可维护性。