> 技术文档 > 【橘子分布式】gRPC(番外篇-拦截器)

【橘子分布式】gRPC(番外篇-拦截器)


一、简介

我们之前其实已经完成了关于grpc的一些基础用法,实际上还有一些比较相对进阶的使用方式。比如:

  • 拦截器:包括客户端服务端的拦截器,进而在每一端都可以划分为流式的拦截器和非流式的拦截器。和以前我们在spring web中的拦截器思路是一样的,都可以在请求来之前做一些统一的处理,进而减少代码量,做一些鉴权 ,数据校验 ,限流等等,和业务解耦。
    gRPC的拦截器
    1. 一元请求的 拦截器
      客户端 【请求 响应】
      服务端 【请求 响应】
    2. 流式请求的 拦截器 (Stream Tracer)
      客户端 【请求 响应】
      服务端 【请求 响应】
  • 客户端重试:grpc的客户端还可以发起重试请求,当我们有一些异常并非代码异常的时候,可以通过重试来避免问题。
  • NameResolver :当用于微服务的时候,需要注册中心对服务名的解析等等。
  • 负载均衡:包括(pick-first , 轮训)等轮训方式。
  • 可以在其他微服务框架中整合,比如dubbo中,spring cloud中,用protobuf来序列化数据,用grpc来发起rpc(比如可以替代open fegin)等场合。

下面我们就来从拦截器功能开始学习一下grpc。

二、项目构建

我们为进阶篇搭建一个新的工程。结构还是客户端,服务端,api模块。
其中api模块作为公共内容被其他模块引入做公共的声明使用。

api模块: rpc-grpc-adv-api
服务端模块:rpc-grpc-adv-server
客户端模块: rpc-grpc-adv-client

1、api模块

<dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.51.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.51.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.51.0</version> </dependency> <dependency>  <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> </dependency></dependencies><build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.7.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact> <outputDirectory>${basedir}/src/main/java</outputDirectory> <clearOutputDirectory>false</clearOutputDirectory> </configuration> <executions> <execution>  <goals> <goal>compile</goal> <goal>compile-custom</goal>  </goals> </execution> </executions> </plugin> </plugins></build>

在main目录下创建proto目录,下建立Hello.proto文件,声明内容为。

syntax = \"proto3\";package com.levi;option java_multiple_files = false;option java_package = \"com.levi\";option java_outer_classname = \"HelloProto\";message HelloRequest{ string name = 1;}message HelloRespnose{ string result = 1;}service HelloService{ // 普通方法 rpc hello(HelloRequest) returns (HelloRespnose); // 双端流方法 rpc hello1(stream HelloRequest) returns (stream HelloRespnose);}

然后通过编译器编译生成对应的message类HelloProto.java和service类HelloServiceGrpc.java。
然后该模块将会被其他模块引用,使用这些定义的类。

2、server模块

引入api模块。

<dependencies> <dependency> <groupId>com.levi</groupId> <artifactId>rpc-grpc-adv-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency></dependencies>

服务端业务代码为:

@Slf4jpublic class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase { @Override public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) { String name = request.getName(); System.out.println(\"接收到客户端的参数name = \" + name); responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult(\"this is server result\").build()); responseObserver.onCompleted(); }}

服务端启动代码为:

package com.levi;import com.levi.service.HelloServiceImpl;import io.grpc.Server;import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer { public static void main(String[] args) throws InterruptedException, IOException { ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000); serverBuilder.addService(new HelloServiceImpl()); Server server = serverBuilder.build(); server.start(); server.awaitTermination(); }}

3、client模块

引入api模块。

<dependencies> <dependency> <groupId>com.levi</groupId> <artifactId>rpc-grpc-adv-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency></dependencies>

ok,至此就搭建完了项目结构。

三、拦截器

1、一元拦截器

其中一元拦截器就是在我们以前的一元通信模式使用的。也就是非流式的通信模式下。
而一元拦截器也分为两种:客户端拦截器和服务端拦截器
每一种下面又能分为两种
1.简单模式:只能拦截请求,不能拦截响应。
2.复杂模式:可以拦截请求和响应两种。
下面我们先来研究客户端拦截器。

1.1、客户端拦截器

我们说客户端拦截器又分为简单模式和复杂模式。

1.1.1、简单客户端拦截器

我们来开发客户端的代码,首先我们来编写一个简单拦截器。

package com.levi.interceptor;import io.grpc.*;import lombok.extern.slf4j.Slf4j;/*** 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor* 该拦截器在客户端发起请求时被调用,* 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等*/@Slf4jpublic class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { log.debug(\"模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....\"); /* * 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传 * 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项 * 其实就是拦截器方法的MethodDescriptor method, CallOptions callOptions * 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接) * 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用 */ return next.newCall(method, callOptions);}

我们定义好拦截器之后就要整合在客户端调用的构建上。

package com.levi;import com.levi.interceptor.CustomClientInterceptor;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import java.util.List;public class GrpcClient { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder .forAddress(\"localhost\", 9000) // .intercept(new CustomClientInterceptor()) // 可以传递多个拦截器,按照传递顺序执行拦截器 .intercept(List.of(new CustomClientInterceptor())) .usePlaintext() .build(); try { HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel); HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder()  .setName(\"levi\")  .build(); HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(helloRequest); System.out.println(\"接收到的服务端响应为: \" + helloRespnose.getResult()); } catch (Exception e) { e.printStackTrace(); } finally { managedChannel.shutdown(); } }}

启动服务端,启动客户端,日志显示正常输出没毛病。
但是此时我们也看出来这种简单模式存在几个问题。

# 客户端简单拦截器的问题1. 只能拦截请求,不能拦截响应。我们只能在请求的时候发起拦截,但是接收响应的时候无法拦截,也就是类似spring mvc的时候没有后置的拦截能力。2. 即使拦截了请求操作,但是就这个请求拦截上,这个业务粒度也是过于宽泛,不精准。无法在请求的各个阶段发起拦截(1. 开始阶段 2. 设置消息数量 3.发送数据阶段 4.半连接阶段。),其实我们上面的代码可以看出来我们的拦截器是在往下传递ClientCall给grpc,也就是这个调用最后是ClientCall完成的。这里的各个阶段拦截其实就是在ClientCall的各个方法里面插入一些拦截操纵,其实就是在发起请求前,在ClientCall构建的各个阶段拦截一下(这个的底层应该是netty那些阶段性的事件感知实现的。)装饰者模式增强了一下。

【橘子分布式】gRPC(番外篇-拦截器)

1.1.2、复杂客户端拦截器

我们前面操作的简单客户端请求拦截器粒度比较大,无法实现对请求过程的更加细力度的监听和管理。所以我们需要一个更加强大的拦截器。我们说白了就是对原来能正常请求中间加一些增强方法,其实就是装饰者模式,包装一下原始类型。在原始类型的基础上加了一堆方法分别在各个阶段生效,从而来增强原始能力。但是真正的rpc调用实现还是原始类型发起的。

请求拦截

于是我们来写一下代码。

/* 这个类型增强原始类型 适用于控制 拦截 请求发送各个环节 */@Slf4jclass CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> { /** * 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装 */ protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) { super(delegate); } /** * 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接 * 返回responseListener.onClose(Status.INTERNAL, new Metadata()); * 否则就发起请求delegate().start(responseListener, headers); */ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception { log.debug(\"发送请求数据之前的检查.....\"); //真正的去发起grpc的请求 // 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到 // delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式 delegate().start(responseListener, headers); }}

我们看到这就是一个增强的包装类,他是对原始的简单拦截器的那个ClientCall的包装。我们看到它在后续的动作之前增强了一个检查的实现。
然后你钥匙要继续就一定要delegate().start才会往下走。否则没启动ClientCall会报错。
ok,我们已经完成了增强ClientCall的开发,现在要把原来的拦截器方法里面的简单ClientCall替换为增强ClientCall。
我们来修改拦截器代码。

/** * 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor * 该拦截器在客户端发起请求时被调用, * 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等 */@Slf4jpublic class CustomClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { log.debug(\"模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....\"); /* * 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传 * 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项 * 其实就是拦截器方法的MethodDescriptor method, CallOptions callOptions * 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接) * 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用 */ // return next.newCall(method, callOptions); /* * 如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装 * 那么这个时候,就不能返回原始ClientCall对象, * 应该返回 包装的ClientCall ---> CustomForwardingClientClass */ return new CustomForwardingClientClass<>(next.newCall(method, callOptions)); }}

于是,启动服务端代码,客户端代码观察执行结果。
【橘子分布式】gRPC(番外篇-拦截器)
没有问题。
此外这个增强拦截还有更加细粒度的方法增强,我们来实现一下。

package com.levi.interceptor;import io.grpc.*;import lombok.extern.slf4j.Slf4j;import javax.annotation.Nullable;/** * 自定义客户端拦截器,需要实现grpc提供的拦截器接口ClientInterceptor * 该拦截器在客户端发起请求时被调用, * 可以在该拦截器中对请求进行处理,比如添加请求头、修改请求参数等 */@Slf4jpublic class CustomClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { log.debug(\"模拟业务处理,这是一个拦截启动的处理 ,统一的做了一些操作 ....\"); /* * 拦截器在客户端发起stub的rpc调用之前被调用。处理完之后往下传,把本次调用的一些信息继续往下传 * 把调用交给grpc,所以需要传递下去调用方法的元信息和一些选项 * 其实就是拦截器方法的MethodDescriptor method, CallOptions callOptions * 然后往下传是用来发起调用的,底层基于netty,所以需要传递Channel next(这是netty调用的基础连接) * 所以需要返回一个ClientCall,封装元信息,然后交给grpc,用来发起调用 */ // return next.newCall(method, callOptions); /* * 如果我们需要用复杂客户端拦截器 ,就需要对原始的ClientCall进行包装 * 那么这个时候,就不能返回原始ClientCall对象, * 应该返回 包装的ClientCall ---> CustomForwardingClientClass */ return new CustomForwardingClientClass<>(next.newCall(method, callOptions)); }}/* 这个类型增强原始类型 适用于控制 拦截 请求发送各个环节 */@Slf4jclass CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> { /** * 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装 */ protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) { super(delegate); } /** * 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接 * 返回responseListener.onClose(Status.INTERNAL, new Metadata()); * 否则就发起请求delegate().start(responseListener, headers); */ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception { log.debug(\"发送请求数据之前的检查.....\"); //真正的去发起grpc的请求 // 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到 // delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式 delegate().start(responseListener, headers); } // 真正开始发送消息,netty的发送消息的方法,outBoundBuffer @Override public void sendMessage(ReqT message) { log.info(\"发送请求数据: {}\", message); super.sendMessage(message); } // 指定发送消息的数量,类似批量发送 @Override public void request(int numMessages) { log.info(\"指定发送消息的数量: {}\", numMessages); super.request(numMessages); } // 取消请求的时候回调触发 @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { log.info(\"取消请求: {}\", message); super.cancel(message, cause); } // 链接半关闭的时候回调触发,请求消息无法发送,但是可以接受响应的消息 @Override public void halfClose() { log.info(\"链接半关闭\"); super.halfClose(); } // 消息发送是否启用压缩 @Override public void setMessageCompression(boolean enabled) { log.info(\"消息发送是否启用压缩: {}\", enabled); super.setMessageCompression(enabled); } // 是否可以发送消息,这个在流式里面会调用,一元的不会 @Override public boolean isReady() { log.info(\"是否可以发送消息: {}\", super.isReady()); return super.isReady(); }}

运行程序结果为:
【橘子分布式】gRPC(番外篇-拦截器)
至此我们看到我们在客户端请求的各个阶段都进行了监听回调。这就是客户端的请求增强了。

响应拦截

前面我们完成的是对于请求的拦截,其实我们可以在客户端这里对服务端响应的拦截。我们可以拦截响应数据,这个能力可以让我们在不同的客户端定制自己的拦截需求。服务端不管你的需求,都返回,你不同的客户端可能有不同的要求,自己去做拦截定制。
我们先来看一下我们之前的那个请求增强。

@Slf4jclass CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> { protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) { super(delegate); } protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception { log.debug(\"发送请求数据之前的检查.....\"); delegate().start(responseListener, headers); }}

你有请求拦截才有响应拦截,而且响应拦截我们一般都是通过监听器来实现的,因为客户端你也不知道你啥时候响应,所以就需要监听回调的形式来监听。我们看到在checkedStart这个方法这里。他的参数列表里面有一个responseListener,响应监听器。其实就是这个东西,我们需要重新实现他,然后传进去,他就会在checkedStart调用的时候传递给grpc,grpc就根据你的实现来拦截了。现在他是一个responseListener,啥也没有,你要想拦截功能还是要增强包装。所以我们来实现一下。

/* 用于监听响应,并对响应进行拦截,其中响应头回来onHeaders被调用是服务端的 responseObserver.onNext这个调用触发的。 而服务端调用responseObserver.onCompleted()才会回调onMessage这个。 对应的其实就是netty的write和flush,responseObserver.onCompleted() 才会真的flush,把数据写回来。可以在服务端做修改测试一下。 */@Slf4jclass CustomCallListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {// 构造器包装原始的Listener,下面的回调实现包装增强。 protected CustomCallListener(ClientCall.Listener<RespT> delegate) { super(delegate); } @Override public void onHeaders(Metadata headers) { log.info(\"响应头信息 回来了......\"); super.onHeaders(headers); } @Override public void onMessage(RespT message) { log.info(\"响应的数据 回来了.....{} \", message); super.onMessage(message); }// 这个在流式里面会调用,一元的不会,可以不实现 @Override public void onReady() { super.onReady(); } //这个在流式里面会调用,一元的不会,可以不实现 @Override public void onClose(Status status, Metadata trailers) { super.onClose(status, trailers); }}

通过构造函数执行包装,然后再包装里面增强。此时我们只需要替代delegate().start(responseListener, headers);中的参数responseListener为我们自己定义的就好了。

@Slf4jclass CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> { /** * 构造器模式,需要实现构造函数,传入原始类型,进行增强类型的包装 */ protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) { super(delegate); } /** * 开始调用,目的 看一个这个RPC请求是不是可以被发起。比如加一些鉴权等功能来判断是不是可以调用,如果不可以直接 * 返回responseListener.onClose(Status.INTERNAL, new Metadata()); * 否则就发起请求delegate().start(responseListener, headers); */ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception { log.debug(\"发送请求数据之前的检查.....\"); //真正的去发起grpc的请求 // 是否真正发送grpc的请求,取决这个start方法的调用,delegate()就是原始类型,可以通过构造函数来看到 // delegate()就是原始类型那个之前简单调用的ClientCall,这就是装饰器模式 // delegate().start(responseListener, headers); delegate().start(new CustomCallListener<>(responseListener), headers);// 传入增强响应拦截 } ...... 省略其余代码}

执行没有问题。
【橘子分布式】gRPC(番外篇-拦截器)

1.2、服务端拦截器

1.2.1、服务端简单拦截器

对应客户端那边的拦截器,服务端这里其实是对应的,该有的都有。我们来看下服务端的简单拦截器。

/** * 自定义服务端拦截器 */@Slf4jpublic class CustomServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { //在服务器端 拦截请求操作的功能 写在这个方法中 log.debug(\"服务器端拦截器生效.....\"); // 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器 ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers); return reqTListener; }}

这里的概念我们都可以在客户端那里找到对应的。我们看到这个方法返回了一个ServerCall.Listener 类型的,而且泛型是req,可见是对于请求的监听器。作为服务端,他是被动连接的,所以她的拦截方式就是监听,什么时候来我什么时候拦截,他不知道你啥时候来,就只能监听着。而next.startCall(call, headers);返回的就是一个具有原始能力的拦截器,没有封装增强的。我们把这个拦截器整合到服务端发布代码中使其生效。

package com.levi;import com.levi.interceptor.CustomServerInterceptor;import com.levi.service.HelloServiceImpl;import io.grpc.Server;import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer { public static void main(String[] args) throws InterruptedException, IOException { ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000); serverBuilder.addService(new HelloServiceImpl()); // 注册自定义拦截器 serverBuilder.intercept(new CustomServerInterceptor()); Server server = serverBuilder.build(); server.start(); server.awaitTermination(); }}

我们把自定义的拦截器注册进去之后启动服务端和客户端看一下。没有问题。
【橘子分布式】gRPC(番外篇-拦截器)
同样的服务端的简单拦截器也存在像客户端那边的问题,
拦截请求发送过来的数据,无法处理响应的数据。
拦截力度过于宽泛
所以我么需要复杂拦截器,增强原始的拦截器,达到更加细力度的控制拦截。一切都和当初我们在客户端做的一样,重新定义一个增强的。

1.2.2、服务端复杂拦截器

拦截请求,拦截的是客户端过来的消息

/** * 自定义服务端拦截器 */@Slf4jpublic class CustomServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { //在服务器端 拦截请求操作的功能 写在这个方法中 log.debug(\"服务器端拦截器生效.....\"); // 返回req请求监听器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器 // ServerCall.Listener reqTListener = next.startCall(call, headers); // 包装器设计模式,封装原始的监听器,增强原始监听器的功能,实际的核心调用还是原始的在做 // 只是加了一些额外的增强的方法 return new CustomServerCallListener<>(next.startCall(call, headers)); }}/** * 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器 * 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作, * 本质是对于原始监听器的一个包装增强,包装器模式 */@Slf4jclass CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> { protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) { super(delegate); } @Override //准备接受请求数据 public void onReady() { log.debug(\"onRead Method Invoke,准备好接收客户端数据....\"); super.onReady(); } @Override public void onMessage(ReqT message) { log.debug(\"接受到了客户端请求提交的数据,客户端的请求数据是: {} \", message); super.onMessage(message); } @Override public void onHalfClose() { log.debug(\"监听到了 半连接触发这个操作...\"); super.onHalfClose(); } @Override public void onComplete() { log.debug(\"服务端 调用onCompleted()触发...\"); super.onComplete(); } @Override public void onCancel() { log.debug(\"出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作\"); super.onCancel(); }}

在经历了客户端的开发之后,我们这里其实就很好理解了。调用没有问题。
【橘子分布式】gRPC(番外篇-拦截器)

拦截响应,拦截的是服务端发给客户端的响应

我们能拦截请求,自然也就能拦截响应。我们先来看一下什么是服务端的响应,其实就是服务端回写给客户端的操作。也就是服务端调用客户端的操作,我们上面拦截请求其实是客户端发给服务端,也即是服务端的监听ServerCall.Listener,服务端的监听器做包装增强。
现在你要增强服务端对客户端的调用其实就是ServerCall(这里对应我们客户端那里的ClientCall)。所以我们要对ServerCall做包装。就是你谁干啥就增强啥就实现啥就行。

/** * 目的:通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能 */@Slf4jclass CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {// 这里包装的是ServerCall protected CustomServerCall(ServerCall<ReqT, RespT> delegate) { super(delegate); } @Override //指定发送消息的数量 【响应消息】 public void request(int numMessages) { log.debug(\"response 指定消息的数量 【request】\"); super.request(numMessages); } @Override //设置响应头 public void sendHeaders(Metadata headers) { log.debug(\"response 设置响应头 【sendHeaders】\"); super.sendHeaders(headers); } @Override //响应数据 public void sendMessage(RespT message) { log.debug(\"response 响应数据 【send Message 】 {} \", message); super.sendMessage(message); } @Override //关闭连接 public void close(Status status, Metadata trailers) { log.debug(\"response 关闭连接 【close】\"); super.close(status, trailers); }}

然后我们再把这个包装增强整合到拦截器里面,交给grpc的体系中才能生效,在interceptCall中进行整合,我们不需要改动服务端发布那里的代码,那里可以直接通过CustomServerInterceptor来处理我们这里整合到的两个拦截器,以下为完整代码。

package com.levi.interceptor;import io.grpc.*;import lombok.extern.slf4j.Slf4j;/** * 自定义服务端拦截器 */@Slf4jpublic class CustomServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { //在服务器端 拦截请求操作的功能 写在这个方法中 log.debug(\"服务器端拦截器生效.....\"); //包装ServerCall 处理服务端响应拦截 CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call); // 包装Listener 处理服务端请求拦截 CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers)); return reqTCustomServerCallListener; }}/** * 复杂服务端拦截器,用于监听服务端req请求的事件,reqTListener就是原始的拦截监听器 * 对于reqTListener的事件,我们可以在事件触发时,做一些自定义的操作, * 本质是对于原始监听器的一个包装增强,包装器模式 */@Slf4jclass CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> { protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) { super(delegate); } @Override //准备接受请求数据 public void onReady() { log.debug(\"onRead Method Invoke,准备好接收客户端数据....\"); super.onReady(); } @Override public void onMessage(ReqT message) { log.debug(\"接受到了客户端请求提交的数据,客户端的请求数据是: {} \", message); super.onMessage(message); } @Override public void onHalfClose() { log.debug(\"监听到了 半连接触发这个操作...\"); super.onHalfClose(); } @Override public void onComplete() { log.debug(\"服务端 调用onCompleted()触发...\"); super.onComplete(); } @Override public void onCancel() { log.debug(\"出现异常后 会调用这个方法... 可以在这里做一些关闭资源的操作\"); super.onCancel(); }}/** * 通过自定义的ServerCall 包装原始的ServerCall 增加对于响应拦截的功能 */@Slf4jclass CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> { protected CustomServerCall(ServerCall<ReqT, RespT> delegate) { super(delegate); } @Override //指定发送消息的数量 【响应消息】 public void request(int numMessages) { log.debug(\"response 指定消息的数量 【request】\"); super.request(numMessages); } @Override //设置响应头 public void sendHeaders(Metadata headers) { log.debug(\"response 设置响应头 【sendHeaders】\"); super.sendHeaders(headers); } @Override //响应数据 public void sendMessage(RespT message) { log.debug(\"response 响应数据 【send Message 】 {} \", message); super.sendMessage(message); } @Override //关闭连接 public void close(Status status, Metadata trailers) { log.debug(\"response 关闭连接 【close】\"); super.close(status, trailers); }}

而且我们看到增强类都是要实现构造的,因为要传进去原始类,进行封装,调用核心方法还是走super,走那个原始的操作。你的增强的操作可以加在这些新的方法里面。这些增强方法,你可以酌情看你要的业务方法,需要的就实现,不需要就可以不覆盖实现。其实他就是服务端的响应的各个阶段不同的触发。
我们运行代码没有问题,各个阶段都被触发了。
【橘子分布式】gRPC(番外篇-拦截器)
对于你要是想只拦截响应,不拦截请求可以这么做。

public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { //在服务器端 拦截请求操作的功能 写在这个方法中 log.debug(\"服务器端拦截器生效.....\"); //包装ServerCall 处理服务端响应拦截 CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call); // 包装Listener 处理服务端请求拦截 CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers)); // return reqTCustomServerCallListener; /** * 只拦截响应,我们就不需要包装Listener,也就是返回原始的Listener即可。原始的Listener我们是通过 * next.startCall(reqTRespTCustomServerCall, headers)获取到的。所以继续用next.startCall不操作包装的 * Listener即可,但是我们要包装响应也就是serverCall,所以返回reqTRespTCustomServerCall。包在原始Listener中 * 你要是包装请求,那就是需要包装的Listener,不需要就直接next.startCall返回startCall即可。 */ return next.startCall(reqTRespTCustomServerCall, headers); }

明白请求是包装的listener,响应是servercall,需要哪个就加强哪个就行,不需要增强拦截就用原始的就行。

四、总结

这就是grpc中比较常见的一元拦截器的使用,他是对于一元rpc的拦截。在各个拦截方法中我们可以定义一些自己的业务方法。进而灵活使用拦截器。而且你要是在某个点拦截之后不想继续往下走,那你就不要调用每个拦截方法的super,不要做后续的调用,直接断开链路即可。而且至于拦截请求还是响应就看你包装啥就完了,他不是耦合在一起的。
后面我们再来分析监听流也就是流式和双向调用的拦截器。

【橘子分布式】gRPC(番外篇-拦截器)