Go语言Grpc Stream的实现

作者:范闲 时间:2023-08-07 06:19:23 

Stream Grpc

在我们单次投递的数据量很大的时候,比如传输一个二进制文件的时候,数据包过大,会造成瞬时传输压力。或者接收方接收到数据后,需要对数据做一系列的处理工作,

比如:数据过滤 -> 数据格式转换 -> 数据求和 ,这种场景非常适合使用stream grpc,

Stream Grpc演示

syntax = "proto3";

package book_stream;

option go_package = "/book_stream";

service HelloStreamService {
 rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
 rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
 rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}

message BookListStreamRequest{
}

message BookListStreamResponse{
 BookPoint book = 1;
}

message CreateBookStreamRequest{
 BookPoint book = 1;
}

message CreateBookStreamResponse{
 repeated BookIdPoint idx = 1;
}

message FindBookByIdStreamRequest{
 BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
 BookPoint book = 1;
}

message BookIdPoint{
 uint64 idx = 1;
}

message BookPoint{
 uint64 idx = 1;
 string name = 2;
 float price = 3;
 string author = 4;
}

运行protoc --go_out=plugins=grpc:. *.proto生成脚手架文件

  • BookListStream服务端流式RPC

  • CreateBookStream客户端流式RPC

  • FindBookByIdStream双向流式RPC

注意,这里只是用作方便演示使用,演示方法都不是线程安全的

服务端server

var port = 8888

func main() {
  server := grpc.NewServer()
  book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  if err != nil {
     panic(err)
  }
  if err := server.Serve(lis); err != nil {
     panic(err)
  }
}

客户端

func main() {
  var port = 8888
  conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
  if err != nil {
     panic(err)
  }
  defer conn.Close()
  client := book_stream.NewHelloStreamServiceClient(conn)

ctx := context.Background()
  if err := createBookStream(ctx, client); err != nil {
     panic(err)
  }
  if err := printBookList(ctx, client); err != nil {
     panic(err)
  }
  if err := getBookListById(ctx, client); err != nil {
     panic(err)
  }
}

BookListStream

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。

server端实现


var bookStore = map[uint64]book_stream.BookPoint{
  1: {
     Idx:    1,
     Author: "程子",
     Price:  9.9,
     Name:   "游戏思维",
  },
  2: {
     Idx:    2,
     Author: "丁锐",
     Price:  9.9,
     Name:   "活出必要的锋芒",
  },
}

type HelloStreamServiceImpl struct{}

func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
  for idx, bookPoint := range bookStore {
     err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
        Idx:    idx,
        Name:   bookPoint.Name,
        Price:  bookPoint.GetPrice(),
        Author: bookPoint.Author,
     }})
     if err != nil {
        return err
     }
  }
  return nil
}

客户端实现

func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
  req := &book_stream.BookListStreamRequest{}
  listStream, err := client.BookListStream(ctx, req)
  if err != nil {
     return err
  }
  for true {
     resp, err := listStream.Recv()
     if err != nil {
        if err == io.EOF {
           return nil
        }
        return err
     }
     fmt.Printf("%v\n", *resp.Book)
  }
  return nil
}

CreateBookStream

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端

server端实现

func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
  var resList []*book_stream.BookIdPoint
  for {
     resp, err := server.Recv()
     if err == io.EOF {
        return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
     }
     if err != nil {
        return err
     }
     bookStore[resp.Book.Idx] = *resp.Book
     resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
  }
}

客户端实现

var newBookStore = map[uint64]book_stream.BookPoint{
  3: {
     Idx:    3,
     Author: "程子1",
     Price:  9.9,
     Name:   "游戏思维1",
  },
  4: {
     Idx:    4,
     Author: "丁锐1",
     Price:  9.9,
     Name:   "活出必要的锋芒1",
  },
}

func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
  stream, err := client.CreateBookStream(ctx)
  if err != nil {
     return err
  }
  for _, bookPoint := range newBookStore {
     if err := stream.Send(&book_stream.CreateBookStreamRequest{
        Book: &bookPoint,
     }); err != nil {
        return err
     }
  }
  recv, err := stream.CloseAndRecv()
  if err != nil {
     return err
  }
  fmt.Println(recv.Idx)
  return nil
}

stream.SendAndClose,它是做什么用的呢?

在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv

stream.CloseAndRecv 和 stream.SendAndClose 是配套使用的流方法,

FindBookByIdStream

服务端实现

func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
  for {
     resp, err := streamServer.Recv()
     if err == io.EOF {
        return nil
     }
     if err != nil {
        return err
     }
     if book, ok := bookStore[resp.Idx.Idx]; ok {
        if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
           return err
        }
     }
  }
}

客户端实现

func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
  stream, err := client.FindBookByIdStream(ctx)
  if err != nil {
     return err
  }
  var findList = []uint64{1, 2}
  for _, idx := range findList {
     err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
     if err != nil {
        return err
     }
     recv, err := stream.Recv()
     if err != nil {
        return err
     }
     fmt.Printf("%v\n", recv.Book)
  }
  if err := stream.CloseSend(); err != nil {
     return err
  }
  return nil
}

来源:https://juejin.cn/post/7110794610653265933

标签:Go,Grpc Stream
0
投稿

猜你喜欢

  • MySQL 回表,覆盖索引,索引下推

    2024-01-21 12:56:59
  • 定位?浮动?自适应?

    2008-06-30 14:20:00
  • 数据库设计工具MySQL Workbench使用教程(超级详细!)

    2024-01-29 01:26:22
  • python Plotly绘图工具的简单使用

    2023-06-13 01:16:17
  • python tensorflow基于cnn实现手写数字识别

    2023-05-09 06:22:06
  • 浅谈python中真正关闭socket的方法

    2023-11-02 15:41:56
  • Python 列表排序方法reverse、sort、sorted详解

    2021-10-18 10:11:52
  • python 按照sheet合并多个Excel的示例代码(多个sheet)

    2022-07-13 05:20:20
  • 利用Python-iGraph如何绘制贴吧/微博的好友关系图详解

    2022-02-26 07:16:32
  • Django中如何用xlwt生成表格的方法步骤

    2023-07-17 07:47:12
  • Go 实战单队列到优先级队列实现图文示例

    2024-05-22 10:19:03
  • Numpy数组array和矩阵matrix转换方法

    2021-06-25 06:17:26
  • Python-Tkinter Text输入内容在界面显示的实例

    2023-03-21 13:50:58
  • mysql查询慢的原因和解决方案

    2024-01-18 01:30:15
  • mysql 8.0.20 winx64.zip压缩版安装配置方法图文教程

    2024-01-20 04:46:35
  • golang 在windows中设置环境变量的操作

    2024-04-28 09:12:29
  • 视觉注意力—解剖设计的根源

    2010-01-22 15:12:00
  • go语言 全局变量和局部变量实例

    2024-04-26 17:18:56
  • 用 Javascript 验证表单(form)中多选框(checkbox)值

    2024-04-10 10:39:14
  • 详解Anaconda安装tensorflow报错问题解决方法

    2022-04-09 19:49:29
  • asp之家 网络编程 m.aspxhome.com