Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ description: ""

### Generic Streaming Client Initialization

#### protobuf
#### Protobuf

Take the following Protobuf IDL as an example:

Expand Down Expand Up @@ -42,19 +42,26 @@ service Echo {

The four methods included in the example IDL correspond to four scenarios:

1. Client streaming: the client sends multiple messages, the server returns one message, and then closes the stream.
2. Server streaming: The client sends one message, the server returns multiple messages, and then closes the stream. It's suitable for LLM scenarios.
3. Bidirectional streaming: The sending and receiving of client/server are independent, which can be organized in arbitrary order.
4. Unary: non streaming.
1. Client Streaming: the client sends multiple messages, the server returns one message, and then closes the stream.
2. Server Streaming: The client sends one message, the server returns multiple messages, and then closes the stream. It's suitable for LLM scenarios.
3. Bidirectional Streaming: The sending and receiving of client/server are independent, which can be organized in arbitrary order.
4. Unary: In gRPC, this is a single call mode without using streams, similar to the Ping Pong mode in Thrift.

First of all, please initialize the streaming client. Here is an example of streaming client initialization.

```go
import (
dproto "github.com/cloudwego/dynamicgo/proto"
"context"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/generic/proto"
"github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/transport"
)

dOpts := dproto.Options{} // you can specify parsing options as you want
dOpts := proto.Options{} // you can specify parsing options as you want
p, err := generic.NewPbFileProviderWithDynamicGo(your_idl, ctx, dOpts)
// create json pb generic
g, err := generic.JSONPbGeneric(p)
Expand Down Expand Up @@ -93,15 +100,25 @@ service TestService {

The four methods included in the example IDL correspond to four scenarios:

1. Client streaming: the client sends multiple messages, the server returns one message, and then closes the stream.
2. Server streaming: The client sends one message, the server returns multiple messages, and then closes the stream. It's suitable for LLM scenarios.
3. Bidirectional streaming: The sending and receiving of client/server are independent, which can be organized in arbitrary order.
4. Unary (gRPC): Non-streaming. With `streaming.mode` annotation. Not recommended due to performance loss.
5. Unary (KitexThrift): Non-streaming. Recommended.
1. Client Streaming: the client sends multiple messages, the server returns one message, and then closes the stream.
2. Server Streaming: The client sends one message, the server returns multiple messages, and then closes the stream. It's suitable for LLM scenarios.
3. Bidirectional Streaming: The sending and receiving of client/server are independent, which can be organized in arbitrary order.
4. Unary (gRPC): Non-streaming with `streaming.mode` annotation. Not recommended due to performance loss.
5. Ping Pong mode (KitexThrift): Traditional Thrift request-response pattern without using streams. Better performance, recommended.

Here is an example of streaming client initialization.

```go
import (
"context"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/transport"
)

p, err := generic.NewThriftFileProvider(your_idl_path)
/*
// if you use dynamicgo
Expand All @@ -122,6 +139,14 @@ cli, err := genericclient.NewStreamingClient("destService", g,
Example:

```go
import (
"context"
"fmt"
"time"

"github.com/cloudwego/kitex/client/genericclient"
)

// initialize client streaming client using the streaming client you created
streamCli, err := genericclient.NewClientStreaming(ctx, cli, "StreamRequestEcho")
for i := 0; i < 3; i++ {
Expand All @@ -137,11 +162,20 @@ strResp, ok := resp.(string) // response is json string

### Server Streaming

Note: A non-nil error (including `io.EOF`) returned by `Recv` indicates that the server has finished sending (or encountered an error)
Note: An `io.EOF` error returned by `Recv` indicates that the server has finished sending and normally closed the stream, while other non-nil errors indicate actual errors.

Example:

```go
import (
"context"
"fmt"
"io"

"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/pkg/klog"
)

// initialize server streaming client using the streaming client you created, and send a message
streamCli, err := genericclient.NewServerStreaming(ctx, cli, "StreamResponseEcho", `{"message": "grpc server streaming generic request"}`)
for {
Expand All @@ -164,6 +198,16 @@ for {
Example:

```go
import (
"context"
"fmt"
"io"
"sync"

"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/pkg/klog"
)

// initialize bidirectional streaming client using the streaming client you created
streamCli, err := genericclient.NewBidirectionalStreaming(ctx, cli, "BidirectionalEcho")

Expand Down Expand Up @@ -222,6 +266,12 @@ The usage of unary call is similar to normal (non-streaming) generic call.
Example:

```go
import (
"context"

"github.com/cloudwego/kitex/client/genericclient"
)

resp, err := cli.GenericCall(ctx, "UnaryEcho", `{"message": "unary request"}`)
strResp, ok := resp.(string) // response is json string
```
Expand All @@ -230,24 +280,24 @@ strResp, ok := resp.(string) // response is json string

### Recv() got err: rpc error: code = 12 desc = Method not found!

This error occurs when calling with Kitex **protobuf** generic streaming when the downstream is **gRPC-python** (gRPC libraries for other languages may also have this problem).
This error occurs when calling with Kitex **Protobuf** generic streaming when the downstream is **gRPC-python** (gRPC libraries for other languages may also have this problem).

The root cause is that Kitex does not parse the package in the protobuf idl, so the package part of `:path` in the gPRC request is missing, and gRPC-python can't find the corresponding method.
The root cause is that Kitex does not parse the package in the Protobuf IDL, so the package part of `:path` in the gPRC request is missing, and gRPC-python can't find the corresponding method.

e.g.

- normal client

`:path` - /search.gpt_engine.GPTStreamService/GPTGeneration

- protobuf generic client
- Protobuf generic client

`:path` - /GPTStreamService/GPTGeneration

#### Solution

Use the following branch to solve it and wait for the official release of Kitex v1.18.1 to fix this issue.
Use Kitex v0.13.1 or higher version to fix this issue. Kitex v0.13.1 was released in April 2025 ([See release notes](https://github.com/cloudwego/kitex/releases/tag/v0.13.1)):

```shell
go get -u github.com/cloudwego/kitex@v0.12.1-0.20241220085925-b5894d2f9e0c
go get -u github.com/cloudwego/kitex@v0.13.1
```
Loading