Go语言Grpc Stream的实现

2022-06-20 11:02:06
目录
Stream GrpcStream Grpc演示BookListStreamCreateBookStreamFindBookByIdStream

Stream>

在我们单次投递的数据量很大的时候,比如传输一个二进制文件的时候,数据包过大,会造成瞬时传输压力。或者接收方接收到数据后,需要对数据做一系列的处理工作,

比如:数据过滤 -> 数据格式转换 -> 数据求和 ,这种场景非常适合使用stream grpc,

Stream>
syntax = "proto3";

package book_stream;

option go_package = "/book_stream";

service HelloStreamService {
  rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
  rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
  rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}

message BookListStreamRequest{
}

message BookListStreamResponse{
  BookPoint book = 1;
}

message CreateBookStreamRequest{
  BookPoint book = 1;
}

message CreateBookStreamResponse{
  repeated BookIdPoint idx = 1;
}

message FindBookByIdStreamRequest{
  BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
  BookPoint book = 1;
}

message BookIdPoint{
  uint64 idx = 1;
}

message BookPoint{
  uint64 idx = 1;
  string name = 2;
  float price = 3;
  string author = 4;
}

运行protoc --go_out=plugins=grpc:. *.proto生成脚手架文件

    BookListStream服务端流式RPCCreateBookStream客户端流式RPCFindBookByIdStream双向流式RPC

    注意,这里只是用作方便演示使用,演示方法都不是线程安全的

    服务端server

    var port = 8888
    
    func main() {
       server := grpc.NewServer()
       book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
       lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
       if err != nil {
          panic(err)
       }
       if err := server.Serve(lis); err != nil {
          panic(err)
       }
    }

    客户端

    func main() {
       var port = 8888
       conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
       if err != nil {
          panic(err)
       }
       defer conn.Close()
       client := book_stream.NewHelloStreamServiceClient(conn)
    
       ctx := context.Background()
       if err := createBookStream(ctx, client); err != nil {
          panic(err)
       }
       if err := printBookList(ctx, client); err != nil {
          panic(err)
       }
       if err := getBookListById(ctx, client); err != nil {
          panic(err)
       }
    }

    BookListStream

    服务器端流式>

    简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。

    server端实现

    var bookStore = map[uint64]book_stream.BookPoint{
       1: {
          Idx:    1,
          Author: "程子",
          Price:  9.9,
          Name:   "游戏思维",
       },
       2: {
          Idx:    2,
          Author: "丁锐",
          Price:  9.9,
          Name:   "活出必要的锋芒",
       },
    }
    
    
    type HelloStreamServiceImpl struct{}
    
    func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
       for idx, bookPoint := range bookStore {
          err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
             Idx:    idx,
             Name:   bookPoint.Name,
             Price:  bookPoint.GetPrice(),
             Author: bookPoint.Author,
          }})
          if err != nil {
             return err
          }
       }
       return nil
    }

    客户端实现

    func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
       req := &book_stream.BookListStreamRequest{}
       listStream, err := client.BookListStream(ctx, req)
       if err != nil {
          return err
       }
       for true {
          resp, err := listStream.Recv()
          if err != nil {
             if err == io.EOF {
                return nil
             }
             return err
          }
          fmt.Printf("%v\n", *resp.Book)
       }
       return nil
    }

    CreateBookStream

    客户端流式>

    server端实现

    func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
       var resList []*book_stream.BookIdPoint
       for {
          resp, err := server.Recv()
          if err == io.EOF {
             return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
          }
          if err != nil {
             return err
          }
          bookStore[resp.Book.Idx] = *resp.Book
          resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
       }
    }

    客户端实现

    var newBookStore = map[uint64]book_stream.BookPoint{
       3: {
          Idx:    3,
          Author: "程子1",
          Price:  9.9,
          Name:   "游戏思维1",
       },
       4: {
          Idx:    4,
          Author: "丁锐1",
          Price:  9.9,
          Name:   "活出必要的锋芒1",
       },
    }
    
    func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
       stream, err := client.CreateBookStream(ctx)
       if err != nil {
          return err
       }
       for _, bookPoint := range newBookStore {
          if err := stream.Send(&book_stream.CreateBookStreamRequest{
             Book: &bookPoint,
          }); err != nil {
             return err
          }
       }
       recv, err := stream.CloseAndRecv()
       if err != nil {
          return err
       }
       fmt.Println(recv.Idx)
       return nil
    }

    stream.SendAndClose,它是做什么用的呢?

    在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv

    stream.CloseAndRecv 和 stream.SendAndClose 是配套使用的流方法,

    FindBookByIdStream

    服务端实现

    func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
       for {
          resp, err := streamServer.Recv()
          if err == io.EOF {
             return nil
          }
          if err != nil {
             return err
          }
          if book, ok := bookStore[resp.Idx.Idx]; ok {
             if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
                return err
             }
          }
       }
    }

    客户端实现

    func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
       stream, err := client.FindBookByIdStream(ctx)
       if err != nil {
          return err
       }
       var findList = []uint64{1, 2}
       for _, idx := range findList {
          err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
          if err != nil {
             return err
          }
          recv, err := stream.Recv()
          if err != nil {
             return err
          }
          fmt.Printf("%v\n", recv.Book)
       }
       if err := stream.CloseSend(); err != nil {
          return err
       }
       return nil
    }

    到此这篇关于Go语言Grpc Stream的实现的文章就介绍到这了,更多相关Go语言Grpc Stream 内容请搜索易采站长站以前的文章或继续浏览下面的相关文章希望大家以后多多支持易采站长站!