Sample source: dubbo-go-samples/streaming.
Streaming communication is a new RPC data transfer mode offered by Dubbo3, suitable for the following scenarios:
There are three types of Streaming communication:
This document demonstrates how to use streaming communication in Dubbo-go.
In the proto file, add stream before the request or response type of the methods that require streaming communication,
and generate the corresponding files using protoc-gen-go-triple.
service GreetService {
rpc Greet(GreetRequest) returns (GreetResponse) {}
rpc GreetStream(stream GreetStreamRequest) returns (stream GreetStreamResponse) {}
rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
}
Write the server handler file.
Source file path: dubbo-go-sample/streaming/go-server/cmd/server.go
type GreetTripleServer struct {
}
func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
resp := &greet.GreetResponse{Greeting: req.Name}
return resp, nil
}
func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream greet.GreetService_GreetStreamServer) error {
for {
req, err := stream.Recv()
if err != nil {
if triple.IsEnded(err) {
break
}
return fmt.Errorf("triple BidiStream recv error: %s", err)
}
if err := stream.Send(&greet.GreetStreamResponse{Greeting: req.Name}); err != nil {
return fmt.Errorf("triple BidiStream send error: %s", err)
}
}
return nil
}
func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
var reqs []string
for stream.Recv() {
reqs = append(reqs, stream.Msg().Name)
}
if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
}
resp := &greet.GreetClientStreamResponse{
Greeting: strings.Join(reqs, ","),
}
return resp, nil
}
func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
for i := 0; i < 5; i++ {
if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
return fmt.Errorf("triple ServerStream send err: %s", err)
}
}
return nil
}
Write the client file.
Source file path: dubbo-go-sample/streaming/go-client/cmd/client.go
func main() {
cli, err := client.NewClient(
client.WithClientURL("tri://127.0.0.1:20000"),
)
if err != nil {
panic(err)
}
svc, err := greet.NewGreetService(cli)
if err != nil {
panic(err)
}
TestClient(svc)
}
func TestClient(cli greet.GreetService) {
if err := testUnary(cli); err != nil {
logger.Error(err)
}
if err := testBidiStream(cli); err != nil {
logger.Error(err)
}
if err := testClientStream(cli); err != nil {
logger.Error(err)
}
if err := testServerStream(cli); err != nil {
logger.Error(err)
}
}
func testUnary(cli greet.GreetService) error {
logger.Info("start to test TRIPLE unary call")
resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
if err != nil {
return err
}
logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
return nil
}
func testBidiStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE bidi stream")
stream, err := cli.GreetStream(context.Background())
if err != nil {
return err
}
if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); sendErr != nil {
return err
}
resp, err := stream.Recv()
if err != nil {
return err
}
logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
if err := stream.CloseRequest(); err != nil {
return err
}
if err := stream.CloseResponse(); err != nil {
return err
}
return nil
}
func testClientStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE client stream")
stream, err := cli.GreetClientStream(context.Background())
if err != nil {
return err
}
for i := 0; i < 5; i++ {
if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); sendErr != nil {
return err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
return nil
}
func testServerStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE server stream")
stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
if err != nil {
return err
}
for stream.Recv() {
logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
}
if stream.Err() != nil {
return err
}
if err := stream.Close(); err != nil {
return err
}
return nil
}
Streaming calls also expose Triple metadata through the generated stream APIs:
| Side | API | Purpose |
|---|---|---|
| Client stream and bidirectional client | RequestHeader() | Add request metadata before the first message is sent |
| Server stream and bidirectional client | ResponseHeader() | Read response headers returned by the server |
| Server stream and bidirectional client | ResponseTrailer() | Read response trailers after the response is completed |
| Provider stream handlers | RequestHeader() | Read request metadata sent by the client |
| Provider stream handlers | ResponseHeader() / ResponseTrailer() | Set response headers and trailers |
Use standard http.Header methods such as Set, Add, Get, and Values when working with these metadata values.
For example, on the client side you can write request metadata before sending the first message:
stream, err := cli.GreetStream(context.Background())
if err != nil {
return err
}
stream.RequestHeader().Set("x-sample-token", "demo-token")
And on the server side you can return response metadata from a stream handler:
func (srv *GreetTripleServer) GreetServerStream(
ctx context.Context,
req *greet.GreetServerStreamRequest,
stream greet.GreetService_GreetServerStreamServer,
) error {
stream.ResponseHeader().Set("x-stream-header", "ready")
stream.ResponseTrailer().Set("x-stream-trailer", "done")
return nil
}
Run the server and client, and you will see the requests return normally.
[start to test TRIPLE unary call]
TRIPLE unary call resp: [triple]
[start to test TRIPLE bidi stream]
TRIPLE bidi stream resp: [triple]
[start to test TRIPLE client stream]
TRIPLE client stream resp: [triple,triple,triple,triple,triple]
[start to test TRIPLE server stream]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]
TRIPLE server stream resp: [triple]