gRPC 与 Protobuf 的深度集成 —— 从服务定义到多语言交互(Go + Java 示例)_grpc-gen-java
在前几篇文章中,我们已经掌握了 Protobuf 的基础语法、高级特性和序列化反序列化操作。本篇文章将深入讲解 gRPC 与 Protobuf 的集成,重点介绍如何通过 .proto
文件定义服务接口,并在 Go 和 Java 中实现 gRPC 服务与客户端的完整交互流程。我们将通过详细代码示例和分步解析,帮助你彻底掌握微服务架构中的通信设计。
一、gRPC 简介与核心概念
1. 什么是 gRPC?
gRPC 是一个高性能、开源的远程过程调用(RPC)框架,基于 HTTP/2 协议 和 Protobuf 数据格式 构建。它支持多种语言,并提供了同步/异步调用、流式通信等特性。
2. gRPC 的核心优势
.proto
文件自动生成客户端和服务端代码grpcurl
)、插件系统二、通过 .proto
定义 gRPC 服务
1. 示例 .proto
文件
syntax = \"proto3\";package user;//新版本有了下面的option go_package 这里的pacage就可以去掉了(当然留着也不影响)option go_package = \"/user;user\"; // 指定生成的 Go 包路径(生成源码的路径和包名,前面是路径后面是包名,可以自己定义)//option go_package = \".;user\"; //这个可以生成在当前目录下// 定义服务接口service UserService { // 1. 单向调用(Unary RPC) rpc GetUser (GetUserRequest) returns (UserResponse); // 2. 服务端流式调用(Server Streaming) rpc ListUsers (ListUsersRequest) returns (stream UserResponse); // 3. 客户端流式调用(Client Streaming) rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse); // 4. 双向流式调用(Bidirectional Streaming) rpc UpdateUsers (stream UpdateUserRequest) returns (stream UserResponse);}// 消息定义message GetUserRequest { int32 id = 1;}message UserResponse { int32 id = 1; string name = 2; string email = 3;}message ListUsersRequest { string filter = 1;}message CreateUserRequest { string name = 1; string email = 2;}message CreateUsersResponse { int32 count = 1;}message UpdateUserRequest { int32 id = 1; string name = 2;}
要注意下面这里有了变化(以后会讲解为什么要用option go_package):
package user;//新版本有了下面的option go_package 这里的pacage就可以去掉了(当然留着也不影响)
option go_package = \"/user;user\"; // 指定生成的 Go 包路径(生成源码的路径和包名,前面是路径后面是包名,可以自己定义)
//option go_package = \".;user\"; //这个可以生成在当前目录下
三、生成 gRPC 代码
1. 安装 gRPC 工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latestgo install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Java
protoc --java_out=. \\ --grpc-java_out=. \\ --plugin=protoc-gen-grpc-java=protoc-gen-grpc-java \\ user.proto
2. 生成代码命令
Go
protoc --go_out=. --go-grpc_out=. user.proto//protoc --go_out=. --go-grpc_out=. user.proto//这个命令使用了两个输出插件:--go_out=. 和 --go-grpc_out=.。它分别调用了 Go 相关的 Protobuf 插件和 gRPC Go 插件来生成对应的目标文件。其中://--go_out=. 表示使用 Go 的 Protobuf 编译插件生成对应的 Go 文件。//--go-grpc_out=. 表示使用 Go 的 gRPC 编译插件生成 gRPC 服务相关的 Go 文件。
Java
protoc --java_out=. --grpc-java_out=. user.proto
四、Go 实现 gRPC 服务端与客户端
1. 服务端代码详解
package mainimport (\"context\"\"fmt\"\"log\"\"net\"pb \"./user_go_proto\"\"google.golang.org/grpc\")type userService struct {pb.UnimplementedUserServiceServer}// 单向调用func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.UserResponse, error) {return &pb.UserResponse{Id: req.Id,Name: \"Alice\",Email: \"alice@example.com\",}, nil}// 服务端流式调用func (s *userService) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {users := []*pb.UserResponse{{Id: 1, Name: \"Alice\", Email: \"alice@example.com\"},{Id: 2, Name: \"Bob\", Email: \"bob@example.com\"},}for _, user := range users {if err := stream.Send(user); err != nil {return err}}return nil}// 客户端流式调用func (s *userService) CreateUsers(stream pb.UserService_CreateUsersServer) error {count := 0for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}count++}return stream.SendAndClose(&pb.CreateUsersResponse{Count: int32(count)})}// 双向流式调用func (s *userService) UpdateUsers(stream pb.UserService_UpdateUsersServer) error {for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}resp := &pb.UserResponse{Id: req.Id,Name: req.Name,}if err := stream.Send(resp); err != nil {return err}}return nil}func main() {lis, err := net.Listen(\"tcp\", \":50051\")if err != nil {log.Fatalf(\"failed to listen: %v\", err)}s := grpc.NewServer()pb.RegisterUserServiceServer(s, &userService{})log.Printf(\"Server listening at %v\", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf(\"failed to serve: %v\", err)}}
代码解析
- 服务端实现:通过
pb.RegisterUserServiceServer
注册服务。 - 流式处理:通过
stream
接口处理双向通信。 - 错误处理:捕获
io.EOF
结束流式调用。
2. 客户端代码详解
package mainimport (\"context\"\"fmt\"\"log\"pb \"./user_go_proto\"\"google.golang.org/grpc\")func main() {conn, err := grpc.Dial(\"localhost:50051\", grpc.WithInsecure())if err != nil {log.Fatalf(\"did not connect: %v\", err)}defer conn.Close()client := pb.NewUserServiceClient(conn)// 单向调用resp, err := client.GetUser(context.Background(), &pb.GetUserRequest{Id: 1})if err != nil {log.Fatalf(\"could not get user: %v\", err)}fmt.Printf(\"User: %v\\n\", resp)// 服务端流式调用stream, err := client.ListUsers(context.Background(), &pb.ListUsersRequest{Filter: \"IT\"})if err != nil {log.Fatalf(\"could not list users: %v\", err)}for {user, err := stream.Recv()if err == io.EOF {break}if err != nil {log.Fatalf(\"error receiving user: %v\", err)}fmt.Printf(\"Received: %v\\n\", user)}// 客户端流式调用stream2, err := client.CreateUsers(context.Background())if err != nil {log.Fatalf(\"could not create users: %v\", err)}for i := 0; i < 3; i++ {if err := stream2.Send(&pb.CreateUserRequest{Name: fmt.Sprintf(\"User %d\", i),Email: fmt.Sprintf(\"user%d@example.com\", i),}); err != nil {log.Fatalf(\"error sending user: %v\", err)}}resp2, err := stream2.CloseAndRecv()if err != nil {log.Fatalf(\"error closing stream: %v\", err)}fmt.Printf(\"Created %d users\\n\", resp2.Count)// 双向流式调用stream3, err := client.UpdateUsers(context.Background())if err != nil {log.Fatalf(\"could not update users: %v\", err)}for i := 0; i < 3; i++ {if err := stream3.Send(&pb.UpdateUserRequest{Id: int32(i),Name: fmt.Sprintf(\"Updated User %d\", i),}); err != nil {log.Fatalf(\"error sending update: %v\", err)}resp3, err := stream3.Recv()if err != nil {log.Fatalf(\"error receiving update: %v\", err)}fmt.Printf(\"Updated: %v\\n\", resp3)}}
代码解析
- 连接建立:通过
grpc.Dial
连接服务端。 - 流式调用:通过
stream.Send()
和stream.Recv()
实现双向通信。 - 错误处理:捕获
io.EOF
结束流式调用。
五、Java 实现 gRPC 服务端与客户端
1. 服务端代码详解
import user.UserServiceGrpc;import user.GetUserRequest;import user.UserResponse;import user.ListUsersRequest;import user.CreateUserRequest;import user.CreateUsersResponse;import user.UpdateUserRequest;import io.grpc.Server;import io.grpc.ServerBuilder;import io.grpc.stub.StreamObserver;import java.io.IOException;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class UserServiceServer { public static void main(String[] args) throws IOException, InterruptedException { final Server server = ServerBuilder.forPort(50051) .addService(new UserServiceImpl()) .build(); server.start(); System.out.println(\"Server started at port 50051\"); final CountDownLatch latch = new CountDownLatch(1); server.awaitTermination(); } static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase { // 单向调用 @Override public void getUser(GetUserRequest request, StreamObserver responseObserver) { UserResponse response = UserResponse.newBuilder() .setId(request.getId()) .setName(\"Alice\") .setEmail(\"alice@example.com\") .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } // 服务端流式调用 @Override public void listUsers(ListUsersRequest request, StreamObserver responseObserver) { UserResponse user1 = UserResponse.newBuilder() .setId(1) .setName(\"Alice\") .setEmail(\"alice@example.com\") .build(); UserResponse user2 = UserResponse.newBuilder() .setId(2) .setName(\"Bob\") .setEmail(\"bob@example.com\") .build(); responseObserver.onNext(user1); responseObserver.onNext(user2); responseObserver.onCompleted(); } // 客户端流式调用 @Override public StreamObserver createUsers(StreamObserver responseObserver) { return new StreamObserver() { int count = 0; @Override public void onNext(CreateUserRequest request) { count++; } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { CreateUsersResponse response = CreateUsersResponse.newBuilder() .setCount(count) .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } }; } // 双向流式调用 @Override public StreamObserver updateUsers(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(UpdateUserRequest request) { UserResponse response = UserResponse.newBuilder() .setId(request.getId()) .setName(request.getName()) .build(); responseObserver.onNext(response); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; } }}
代码解析
- 服务端实现:通过继承
UserServiceGrpc.UserServiceImplBase
实现接口。 - 流式处理:通过
StreamObserver
处理双向通信。 - 错误处理:通过
onError
捕获异常。
2. 客户端代码详解
import user.UserServiceGrpc;import user.GetUserRequest;import user.UserResponse;import user.ListUsersRequest;import user.CreateUserRequest;import user.CreateUsersResponse;import user.UpdateUserRequest;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.StatusRuntimeException;import java.util.concurrent.TimeUnit;public class UserServiceClient { public static void main(String[] args) { ManagedChannel channel = ManagedChannelBuilder.forAddress(\"localhost\", 50051) .usePlaintext() .build(); UserServiceGrpc.UserServiceBlockingStub blockingStub = UserServiceGrpc.newBlockingStub(channel); // 单向调用 GetUserRequest request = GetUserRequest.newBuilder().setId(1).build(); try { UserResponse response = blockingStub.getUser(request); System.out.println(\"User: \" + response.getName()); } catch (StatusRuntimeException e) { e.printStackTrace(); } // 服务端流式调用 ListUsersRequest listRequest = ListUsersRequest.newBuilder().setFilter(\"IT\").build(); UserServiceGrpc.UserServiceStub asyncStub = UserServiceGrpc.newStub(channel); asyncStub.listUsers(listRequest, new StreamObserver() { @Override public void onNext(UserResponse user) { System.out.println(\"Received: \" + user.getName()); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println(\"Stream completed.\"); } }); // 客户端流式调用 UserServiceGrpc.UserServiceStub createStub = UserServiceGrpc.newStub(channel); createStub.createUsers(new StreamObserver() { @Override public void onNext(CreateUsersResponse response) { System.out.println(\"Created \" + response.getCount() + \" users\"); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println(\"Create stream completed.\"); } }).forEachRemaining(user -> { if (user != null) { System.out.println(\"Sending: \" + user.getName()); } }); // 双向流式调用 UserServiceGrpc.UserServiceStub updateStub = UserServiceGrpc.newStub(channel); StreamObserver requestStream = updateStub.updateUsers(new StreamObserver() { @Override public void onNext(UserResponse response) { System.out.println(\"Updated: \" + response.getName()); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println(\"Update stream completed.\"); } }); for (int i = 0; i < 3; i++) { UpdateUserRequest updateRequest = UpdateUserRequest.newBuilder() .setId(i) .setName(\"Updated User \" + i) .build(); requestStream.onNext(updateRequest); } requestStream.onCompleted(); try { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } }}
代码解析
- 连接建立:通过
ManagedChannelBuilder
连接服务端。 - 流式调用:通过
StreamObserver
实现双向通信。 - 错误处理:通过
onError
捕获异常。
六、多语言交互的最佳实践
1. 保持 .proto
文件统一
- 所有语言共享同一个
.proto
文件,确保接口定义一致。 - 使用
protoc
生成对应语言的代码。
2. 版本控制
- 在
.proto
文件中添加版本注释:// Version 1.0.0message User { string name = 1;}
3. 依赖管理
- 使用
go mod
或Maven
管理依赖,确保不同语言的代码版本一致。
注意:
这篇文章中使用的Go和Java 实现 gRPC 服务端与客户端的例子是二者分开用的,而不是混合语言,其实在这里我更想做的是Go和Java放在一起使用,比如Go做服务端,Java做客户端。原因是我觉得Go更适合grpc,所以大家着重看Go的讲解即可。如果要混合的话也是以Go为主,Java为辅。
这次没有使用多语言的原因是,突然混合在一起的话怕大家不好理解,我在其他文章中也有讲解跨语言使用的例子,大家有兴趣的可以去看看。
七、总结
在本文中,我们详细讲解了 gRPC 与 Protobuf 的深度集成,包括:
- 通过
.proto
文件定义服务接口 - 在 Go 和 Java 中实现服务端与客户端
- 单向、流式通信的完整代码示例
- 多语言交互的最佳实践
通过这些内容,你已经能够构建高性能、可扩展的微服务系统,并在不同语言之间实现无缝通信。gRPC 与 Protobuf 的结合是现代分布式系统的基石,希望这篇文章能帮助你更自信地在项目中应用这些技术。
如果这篇文章对大家有帮助可以点赞关注,你的支持就是我的动力😊!