gRPC入门指南 — 服务端流式RPC(二)
前言
前一篇文章,我们学习了简单模式 RPC(Simple RPC),gRPC 服务端和客户端在通信时始终只有一个请求和一个响应。实际的应用场景中,我们会遇到需要不断传输数据的时候,比如当数据量很大时。这个时候我们就可以使用流式 RPC,可以实现边传输、边处理数据。这篇文章先介绍下服务端流式 RPC:客户端一次请求,服务端通过 stream 的方式响应多条数据,如下图所示:
创建并编译proto文件
新建文件 server_stream.proto
syntax = "proto3";
package proto;
// 定义发送请求信息
message SimpleRequest{
// 参数类型 参数名称 标识号
string data = 1;
}
// 定义流式响应信息
message StreamResponse{
int32 code = 1;
string value = 2;
}
// 定义我们的服务(可以定义多个服务,每个服务可以定义多个接口)
service StreamService{
// 服务端流式RPC,需要在响应数据前加stream
rpc List(SimpleRequest) returns (stream StreamResponse){};
}
服务端流式 RPC,定义方法时需要在返回值之前加上 stream。
进入 server_stream.proto 所在的目录,使用如下命令编译文件
protoc --go_out=plugins=grpc:. server_stream.proto
执行完成之后会生成 server_stream.pb.go 文件。
创建server端
package main
import (
pb "go-grpc-example/2-server_stream_rpc/proto"
"google.golang.org/grpc"
"log"
"net"
"time"
)
const (
Address string = ":8000"
Network string = "tcp"
)
// 定义我们的服务
type StreamService struct{}
// 实现List方法
func (s *StreamService) List(req *pb.SimpleRequest, srv pb.StreamService_ListServer) error {
for i := 0; i < 5; i++ {
// 向流中发送消息,默认每次发送消息大小为 math.MaxInt32 byte
err := srv.Send(&pb.StreamResponse{
Code: int32(i),
Value: req.Data,
})
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
}
func main() {
// 1.监听端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("listener err: %v", err)
}
log.Println(Address + " net.Listing...")
// 2.实例化gRPC服务端
// 默认单次接受消息大小为 1024*1024*4 字节(4M),发送大小为 math.MaxInt32 字节
grpcServer := grpc.NewServer()
// 3.注册我们实现的服务 StreamService
pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
// 4.启动gRPC服务端
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpc server err: %v", err)
}
}
通过追源代码可以看到,实例化 gRPC 服务端时,默认情况下默认单次接受消息大小为 1024 * 1024 * 4 字节(4M),发送大小为 math.MaxInt32 字节。
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
)
运行服务端:
go run server.go
输出:
:8000 net listening...
创建client端
package main
import (
"context"
pb "go-grpc-example/2-server_stream_rpc/proto"
"google.golang.org/grpc"
"io"
"log"
)
const Address string = ":8000"
func main() {
// 1.连接服务端
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc conn err: %v", err)
}
defer conn.Close()
// 2.创建grpc客户端
grpcClient := pb.NewStreamServiceClient(conn)
// 3.调用服务端提供的服务
req := &pb.SimpleRequest{
Data: "Hello,Server",
}
stream, err := grpcClient.List(context.Background(), req)
if err != nil {
log.Fatalf("call server err,%v", err)
}
for {
// 4.处理服务端发送过来的流信息
resp, err := stream.Recv()
if err == io.EOF { // 流是否结束
break
}
if err != nil {
log.Fatalf("client get stream err:%v", err)
}
log.Printf("get from stream server,code:%v,value:%v", resp.GetCode(), resp.GetValue())
}
}
从代码可以看出,客户端发送一次请求,之后在 for 循环里面不停地从服务端接受信息,只到服务端结束发送。
运行客户端:
go run client.go
get from stream server,code:0,value:Hello,Server
get from stream server,code:1,value:Hello,Server
get from stream server,code:2,value:Hello,Server
get from stream server,code:3,value:Hello,Server
get from stream server,code:4,value:Hello,Server
总结
这篇文章主要介绍了服务端流式 RPC的简单使用,该模式下客户端发送一次请求,服务端会发送多次响应数据。
下篇文章我们介绍下客户端流式 RPC。
推荐阅读
评论