Streaming Communication

Streaming communication

Sample source: dubbo-go-samples/streaming.

Streaming communication is a new RPC data transfer mode offered by Dubbo3, suitable for the following scenarios:

  • Interfaces that need to send large amounts of data that cannot be placed in a single RPC request or response, requiring batch sending. However, traditional multiple RPC calls cannot resolve issues of order and performance, and if order is to be guaranteed, they must be sent serially.
  • Streaming scenarios where data needs to be processed in the order sent, and the data itself has no definite boundaries.
  • Push scenarios where multiple messages are sent and processed within the same call context.

There are three types of Streaming communication:

  • SERVER_STREAM (Server Stream)
  • CLIENT_STREAM (Client Stream)
  • BIDIRECTIONAL_STREAM (Bidirectional Stream)

1. Introduction

This document demonstrates how to use streaming communication in Dubbo-go.

2. How to use Dubbo-go streaming communication

In the proto file, add stream before the request or response type of the methods that require streaming communication, and generate the corresponding files using protoc-gen-go-triple.

service GreetService {
  rpc Greet(GreetRequest) returns (GreetResponse) {}
  rpc GreetStream(stream GreetStreamRequest) returns (stream GreetStreamResponse) {}
  rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
  rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
}

Write the server handler file.

Source file path: dubbo-go-sample/streaming/go-server/cmd/server.go

type GreetTripleServer struct {
}

func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
	resp := &greet.GreetResponse{Greeting: req.Name}
	return resp, nil
}

func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream greet.GreetService_GreetStreamServer) error {
	for {
		req, err := stream.Recv()
		if err != nil {
			if triple.IsEnded(err) {
				break
			}
			return fmt.Errorf("triple BidiStream recv error: %s", err)
		}
		if err := stream.Send(&greet.GreetStreamResponse{Greeting: req.Name}); err != nil {
			return fmt.Errorf("triple BidiStream send error: %s", err)
		}
	}
	return nil
}

func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
	var reqs []string
	for stream.Recv() {
		reqs = append(reqs, stream.Msg().Name)
	}
	if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
		return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
	}
	resp := &greet.GreetClientStreamResponse{
		Greeting: strings.Join(reqs, ","),
	}

	return resp, nil
}

func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
	for i := 0; i < 5; i++ {
		if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
			return fmt.Errorf("triple ServerStream send err: %s", err)
		}
	}
	return nil
}

Write the client file.

Source file path: dubbo-go-sample/streaming/go-client/cmd/client.go

func main() {
	cli, err := client.NewClient(
		client.WithClientURL("tri://127.0.0.1:20000"),
	)
	if err != nil {
		panic(err)
	}

	svc, err := greet.NewGreetService(cli)
	if err != nil {
		panic(err)
	}
	TestClient(svc)
}

func TestClient(cli greet.GreetService) {
	if err := testUnary(cli); err != nil {
		logger.Error(err)
	}

	if err := testBidiStream(cli); err != nil {
		logger.Error(err)
	}

	if err := testClientStream(cli); err != nil {
		logger.Error(err)
	}

	if err := testServerStream(cli); err != nil {
		logger.Error(err)
	}
}

func testUnary(cli greet.GreetService) error {
	logger.Info("start to test TRIPLE unary call")
	resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
	if err != nil {
		return err
	}
	logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
	return nil
}

func testBidiStream(cli greet.GreetService) error {
	logger.Info("start to test TRIPLE bidi stream")
	stream, err := cli.GreetStream(context.Background())
	if err != nil {
		return err
	}
	if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); sendErr != nil {
		return err
	}
	resp, err := stream.Recv()
	if err != nil {
		return err
	}
	logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
	if err := stream.CloseRequest(); err != nil {
		return err
	}
	if err := stream.CloseResponse(); err != nil {
		return err
	}
	return nil
}

func testClientStream(cli greet.GreetService) error {
	logger.Info("start to test TRIPLE client stream")
	stream, err := cli.GreetClientStream(context.Background())
	if err != nil {
		return err
	}
	for i := 0; i < 5; i++ {
		if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); sendErr != nil {
			return err
		}
	}
	resp, err := stream.CloseAndRecv()
	if err != nil {
		return err
	}
	logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
	return nil
}

func testServerStream(cli greet.GreetService) error {
	logger.Info("start to test TRIPLE server stream")
	stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
	if err != nil {
		return err
	}
	for stream.Recv() {
		logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
	}
	if stream.Err() != nil {
		return err
	}
	if err := stream.Close(); err != nil {
		return err
	}
	return nil
}

Streaming calls also expose Triple metadata through the generated stream APIs:

SideAPIPurpose
Client stream and bidirectional clientRequestHeader()Add request metadata before the first message is sent
Server stream and bidirectional clientResponseHeader()Read response headers returned by the server
Server stream and bidirectional clientResponseTrailer()Read response trailers after the response is completed
Provider stream handlersRequestHeader()Read request metadata sent by the client
Provider stream handlersResponseHeader() / ResponseTrailer()Set response headers and trailers

Use standard http.Header methods such as Set, Add, Get, and Values when working with these metadata values.

For example, on the client side you can write request metadata before sending the first message:

stream, err := cli.GreetStream(context.Background())
if err != nil {
	return err
}
stream.RequestHeader().Set("x-sample-token", "demo-token")

And on the server side you can return response metadata from a stream handler:

func (srv *GreetTripleServer) GreetServerStream(
	ctx context.Context,
	req *greet.GreetServerStreamRequest,
	stream greet.GreetService_GreetServerStreamServer,
) error {
	stream.ResponseHeader().Set("x-stream-header", "ready")
	stream.ResponseTrailer().Set("x-stream-trailer", "done")
	return nil
}

3. Running Effect

Run the server and client, and you will see the requests return normally.

[start to test TRIPLE unary call]
TRIPLE unary call resp: [triple]
[start to test TRIPLE bidi stream]
TRIPLE bidi stream resp: [triple]
[start to test TRIPLE client stream]
TRIPLE client stream resp: [triple,triple,triple,triple,triple]
[start to test TRIPLE server stream]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]