许多业务场景(例如 LLM 的流式响应、大量数据传输)需要 Streaming API:先在 client、server instance 之间建立一个 Stream,然后基于该 Stream 进行消息的单向或双向收发。
该特性是 Kitex v0.9.0 引入的,但有一些 tracing 相关问题是 v0.9.1 修复的,因此推荐使用新版,正文以新版为准、以简化内容。
Kitex 默认的 Thrift API 模式:
从 gRPC 借用的词汇,特指在 Stream 上的 PingPong API。
基于 Stream 的(例如 HTTP2 stream)的 PingPong 请求,Client 发送一个 Message,Server 返回一个 Message,然后关闭 Stream。 因为有性能损失,不建议使用 Unary API,如无特殊需求请直接使用 KitexThrift PingPong API。 如确有需要,可通过 Thrift IDL 里的注解开启(详见后文)。
Streaming API 分成三类(参考 gRPC core concepts)。
Client 发送一个 Message,Server 返回多个 Message,然后关闭 Stream。
Client 发送多个 Message,Server 返回一个 Message,然后关闭 Stream。
Client/Server 的收、发均为独立的流,可根据业务需求,按任意顺序执行 Recv、Send。
该方案是基于 gRPC/HTTP2 实现的,将 Payload 的编码由 Protobuf 替换成 Thrift。
安装支持 Thrift Streaming 的 Kitex (不低于 v0.9.1) 和 Thriftgo (不低于 v0.3.6):
go install github.com/cloudwego/thriftgo@latest
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
Kitex 通过 streaming.mode
注解判断方法的 Streaming 类型。
取值 | 含义 | 说明 |
---|---|---|
bidirectional | Bidirectional streaming | 收、发是独立的流,业务可以按需处理 |
client | Client Side Streaming | Client 发送多个 Message,Server 返回一个 Message |
server | Server Side Streaming | Client 发送一个 Message,Server 返回多个 Message |
unary | Unary over Streaming | Client 发送一个 Message,Server 返回一个 Message (不建议使用, 性能损失较大) |
(其他值) | 无效,报错 |
注意:
Streaming API 有且只有 一个 request 和一个 response,否则 Kitex 会报错
Kitex 支持 在同一个 Service 里同时定义 PingPong API(非Streaming) 和 Streaming API
不建议使用 Unary over HTTP2(性能损失较大),建议使用 PingPong API(KitexThrift )
streaming.mode 只能出现最多一次(不支持指定多个值),否则 Kitex 会报错
示例 IDL(下文的示例均基于该 IDL):
namespace go echo
struct Request {
1: optional string message,
}
struct Response {
1: optional string message,
}
service TestService {
Response Echo (1: Request req) (streaming.mode="bidirectional"),
Response EchoClient (1: Request req) (streaming.mode="client"),
Response EchoServer (1: Request req) (streaming.mode="server"),
// Response EchoUnary (1: Request req) (streaming.mode="unary"), // not recommended
Response EchoPingPong (1: Request req), // KitexThrift, non-streaming
}
对于新增项目,先初始化项目目录:
mkdir demo-project && cd demo-project
module=demo
go mod init $module
Kitex 的使用与原 KitexThrift 项目一致,例如:
kitex -module $module -service demo-server api.thrift
注意:对于现有项目,也需要重新生成代码,并更新 go.mod 里的 Kitex 版本
然后执行:
go mod tidy
注意:
streamclient.Option
(不是 client.Option)streamcall.Option
(不是 callopt.Option)示例代码:
import "github.com/cloudwego/kitex/client/streamclient"
import "github.com/cloudwego/kitex/client/callopt/streamcall"
var streamClient = testservice.MustNewStreamClient(
"demo-server", // Service Name
streamclient.WithHostPorts("127.0.0.1:8888"), // streamclient.Option...
)
stream, err := streamClient.Echo(ctx)
// business logic
注意:
请求双方应协商好关闭 Stream 的条件,否则可能导致双方都一直等待下去(goroutine 泄漏)
示例展示了全双工模式(Recv 和 Send 完全独立)
注意:
stream.Close()
io.EOF
」表示 client 已发送结束示例代码:kitex-examples:thrift_streaming/handler.go#L34
注意:
新启动的 goroutine 应当自行 recover
Client 发送结束后应及时调用 stream.Close() 告知 server
「Recv 返回 io.EOF
或其他 non-nil error」表示 server 已发送结束(或出错)
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L119
注意:method handler 结束后,Kitex 会写 Trailer Frame(等同于关闭 stream);业务代码不需要主动调用 stream.Close()
示例代码:kitex-examples:thrift_streaming/handler.go#L94
注意:「Recv 返回 io.EOF
或其他 non-nil error」表示 server 已发送结束(或出错)
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L185
注意:「Recv 返回 io.EOF
」表示 client 已发送结束
示例代码:kitex-examples:thrift_streaming/handler.go#L82
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L162
Kitex 在设计上区分了 Client(for KitexThrift PingPong API)和 StreamClient(for Streaming API),并且要求 StreamClient 使用另一套 Option(类型不同),避免用户给 StreamClient 指定了不支持的 Option。
注意:
WithRecvMiddleware
、WithRecvMiddlewareBuilder
:详见 Recv/Send 中间件WithSendMiddleware
、WithSendMiddlewareBuilder
:详见 Recv/Send 中间件示例代码:
import "github.com/cloudwego/kitex/client/streamclient"
var streamClient = testservice.MustNewStreamClient(
"demo-server", // Service Name
streamclient.WithHostPorts("127.0.0.1:8888"), // streamclient.Option...
)
示例代码:
import "github.com/cloudwego/kitex/client/callopt/streamcall"
stream, err := streamClient.Echo(
context.Background(),
streamcall.WithHostPort("127.0.0.1:8888"),
)
由于 Server 支持自动探测协议,可以同时支持 Streaming API 和 KitexThrift API,因此无法像 StreamClient 一样使用不同的 Option 类型。
支持通过 option 指定:
没有对应的 Option。 对于 Streaming API,Kitex 的 Timeout 中间件会直接调用 next。
可通过 context.WithTimeout
或 context.WithDeadline
创建带有 Deadline 的 context,并在创建 Stream 时指定该 context,用于控制 Stream 的整体执行时间:
grpc-timeout
发送给服务端rpc error: code = 4 desc = context deadline exceeded
grpc-timeout
并设置到 request context 中rpc error: code = 4 desc = context deadline exceeded
示例代码:
// inject deadline into context BEFORE creating a stream
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := cli.Echo(ctx)
可使用 Kitex 提供的 streaming.CallWithTimeout
方法。
注意:
需要 在创建 Stream 之前 给 ctx 注入 cancel(用 WithCancel 或 WithTimeout 都可以,取决于需求)
将 cancel 方法作为 streaming.CallWithTimeout
的第二个参数
Client 端的 stream.Close()
的语义是 CloseSend
,告诉 server 不再会有新消息(server recv 返回 io.EOF
),并不会结束接收消息,因此不能用于 cancel 方法。
示例代码:
import "github.com/cloudwego/kitex/pkg/streaming"
// Add a cancel func to the context BEFORE creating a stream
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := cli.Echo(ctx)
if err != nil {
// ...
}
// Send with timeout
err = streaming.CallWithTimeout(time.Second, cancel, func() error {
return stream.Send(&test.Request{Message: "hello"})
})
// Recv with timeout
var resp *test.Response
err = streaming.CallWithTimeout(time.Second, cancel, func() (err error) {
resp, err = stream.Recv()
return err
})
Server 端可以使用 stream.Close()
作为 cancel 方法。
示例代码:
var cancel context.CancelFunc = func() {
stream.Close() // the cancel func in streamContext will be called internally
}
var req *test.EchoRequest
err = streaming.CallWithTimeout(time.Second, cancel, func() (errRecv error) {
req, errRecv = stream.Recv()
return errRecv
})
只支持 创建连接(Stream)时 的错误率熔断。 不支持 Recv/Send 的熔断。
不支持重试。
Streaming API 不支持 fallback。
注意:由于在中间件中获取的 Request 总是 nil,因此 keyFunc 不能直接读取 request
参考方案:
示例代码:
streamClient := testservice.MustNewStreamClient(
"demo-server",
streamclient.WithLoadBalancer(loadbalance.NewConsistBalancer(
loadbalance.NewConsistentHashOption(
func(ctx context.Context, request interface{}) string {
hashKey, _ := ctx.Value("MY_HASH_KEY")
if hashKey == "" {
// if necessary, return a random string, but never an empty string
panic("invalid hashKey for consistent hash")
}
return hashKey
},
),
))
)
request := &echo.Request{Message: "hello"}
ctx := context.WithKey(context.background(), "MY_HASH_KEY", keyFunc(request))
stream, err := streamClient.Echo(ctx, request)
说明:
注意:
request | response | |
---|---|---|
Bidirectional | interface{} = nil | *streaming.Result |
Client Streaming | interface{} = nil | *streaming.Result |
Server Streaming | interface{} = nil | *streaming.Result |
注:
Client middleware 可以通过 response 的类型来判断是否是 streaming 请求:
func clientMW(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
if _, ok := resp.(*streaming.Result); ok {
// it's a streaming request
return next(ctx, req, resp)
} else {
// it's a non-streaming request
return next(ctx, req, resp)
}
}
}
说明:
request | response | |
---|---|---|
Bidirectional | *streaming.Args | interface{} = nil |
Client Streaming | *streaming.Args | interface{} = nil |
Server Streaming | *streaming.Args | interface{} = nil |
注意:
Server middleware 应通过 request 参数的类型来判断是否是 Streaming 请求:
func serverMW(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
if _, ok := req.(*streaming.Args); ok {
// it's a streaming request
return next(ctx, req, resp)
} else {
// it's a non-streaming request
return next(ctx, req, resp)
}
}
}
Recv/Send 中间件提供了一种简便的方式,可以在消息的收发之上应用 decorator 模式,增加自定义逻辑。
注意:在 Recv Middleware 中,需要先调用 next,才能读到 message。
注意:Client Recv 的是 API 的 response 类型。
import (
"github.com/cloudwego/kitex/client/streamclient"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/pkg/utils/kitexutil"
)
var streamClient = testservice.MustNewStreamClient(
"demo-server",
streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, resp interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, resp)
klog.Infof("[%s] client recv middleware, err = %v, resp = %v", method, err, resp)
return err
}
}),
)
注意:Server Recv 的是 API 的 request 类型。
svr := test.NewServer(new(TestServiceImpl),
server.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, req interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, req)
klog.Infof("[%s] server recv middleware: <= req = %v, err = %v", method, req, err)
return err
}
}),
)
注意:Client Send 的是 API 的 request 类型。
import (
"github.com/cloudwego/kitex/client/streamclient"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/pkg/utils/kitexutil"
)
var streamClient = testservice.MustNewStreamClient(
"demo-server",
streamclient.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint {
return func(stream streaming.Stream, req interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, req)
klog.Infof("[%s] client send middleware, err = %v, req = %v", method, err, req)
return err
}
}),
)
注意:Server Send 的是 API 的 response 类型。
svr := test.NewServer(new(TestServiceImpl),
server.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint {
return func(stream streaming.Stream, resp interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, resp)
klog.Infof("[%s] server send middleware: => resp = %v, err = %v", method, resp, err)
return err
}
}),
)
我们可以通过给用于创建 Stream 的 ctx 注入 key,实现在 middleware 之间共享数据的能力。
Kitex 提供了一组简单的工具方法,通过给 ctx 注入一个 sync.Map ,以便在各 middleware 之间交换数据:
注意:因为 Kitex 内部经常需要从 ctx 读取信息(例如 RPCInfo),每注入一个 key 就增加读取链表的深度,会有一点性能损失,因此 Kitex 默认不注入该 key
在 Client Middleware 里,调用 next
之前尚未创建 Stream,因此可以通过往 ctx 里注入 map,再调用 next
,就可以在 Recv/Send middleware 里从 stream.Context()
获取,用于读写:
import "github.com/cloudwego/kitex/pkg/utils/contextmap"
streamClient = testservice.MustNewStreamClient(
"server",
streamclient.WithHostPorts("127.0.0.1:8888"),
streamclient.WithMiddleware(func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// inject sync.Map in client middleware
ctx = contextmap.WithContextMap(ctx)
if m, ok := contextmap.GetContextMap(ctx); ok {
m.Store("hello", "world")
}
return next(ctx, req, resp)
}
}),
streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, resp interface{}) (err error) {
// get the map in Recv/Send middleware for read/write
if m, ok := contextmap.GetContextMap(stream.Context()); ok {
if value, ok := m.Load("hello"); ok {
klog.Infof("[Recv Middleware] hello = %v", value)
}
}
return next(stream, resp)
}
}),
)
Server 端与 Client 端不同,进入 Middleware 时已经创建好了 Stream,因此不能在 Server Middleware 里注入,但可以通过如下方式实现:
server.WithMetaHandler
Option 指定一个 MetaHandler
MetaHandler
需要实现 StreamingMetaHandler
接口OnReadStream
里给 ctx 注入 sync.Map
,返回新的 ctxKitex 提供了一个 customMetaHandler,以便在创建 stream 之前给 ctx 增加一个 key。你只需要在 server 初始化时指定如下 option:
server.WithMetaHandler(remote.NewCustomMetaHandler(remote.WithOnReadStream(
func(ctx context.Context) (context.Context, error) {
return contextmap.WithContextMap(ctx), nil
},
)))
注:
请参考:CloudWeGo 官网 - Kitex - Metainfo 的「gRPC Metadata」一节。
注意:
github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata
Recv()
收到 non-nil error(io.EOF
或其他错误)作为流结束的标志,此时才会记录 RPCFinish 事件streaming.FinishStream(stream, err)
来产生 RPCFinish 事件Kitex 用户可以通过添加自己的 Tracer,在 Finish() 方法里处理该事件,详见 Kitex - 可观测性 - 链路追踪 - 自定义 Tracer
如果自定义 Tracer 实现了 rpcinfo.StreamEventReporter 接口,Kitex 会注入 Recv、Send 中间件,在每次 Recv、Send 执行完后调用 tracer 的 ReportStreamEvent 方法; 在该方法里,可以获取到本次 Recv、Send 的消息大小:(注意不要另起 goroutine,否则可能读取到的不是本次调用)
具体示例可参考:kitex-tests: testTracer
计划实现,但暂无明确时间表。
gRPC/HTTP2 的实现基于「本地缓冲区」,Send 和 Recv 操作是直接在缓冲区上操作的。 因此需注意以下几点:
「Send 返回 nil」只表明消息放入了本地缓冲区,不等于「消息已发送到对端」
Send 和 Recv 操作的「耗时」和 PingPong API 的「Latency」含义有显著差别:
设计如此。 对于 Client Streaming/Server Streaming,虽然形式上和 PingPong API 类似(有 Request/Response),但是底层实现完全不同,在中间件中是读不到的。 请使用 Recv/Send 中间件处理流上的消息。
io.EOF
或其他错误)作为流结束的标志,此时才会记录 RPCFinish 事件业务异常是自定义业务状态码,在框架的链路上不会认为是错误请求,会被封装到 rpcinfo 内部。 因此如果有业务异常,在 Client 的 RecvMiddleware 里 next 返回的 err 为 nil,可用如下代码读取业务异常:
bizErr := rpcinfo.GetRPCInfo(stream.Context()).Invocation().BizStatusErr()
预计相比 HTTP2 能显著提高性能。
支持,详见 CloudWeGo 官网文档:Kitex - 单 Server 多 Service。
不能。 不过如果该 gRPC library 支持二进制泛化调用,可以结合 thrift 编解码器,发送thrift payload 实现互通。
注意: Kitex client 发送的 “content-type” header 值为 “application/grpc+thrift”
通过在发起请求前给 client 的 ctx 注入一个 cancel 方法,可以在需要的时候提前结束请求:
ctx, cancel := context.WithCancel(origCtx) // or WithTimeout/WithDeadline, as needed
defer cancel()
st, err := streamClient.someServerStreamingAPI(ctx, req)
// your business code
// normally you should call st.Recv() until it returns a non-nil error (e.g. io.EOF)
// end the receiving stream prematurely
if someCondition {
cancel()
streaming.FinishStream(st, err) // necessary for generating the RPCFinish event here
}
补充说明:
st.Recv()
收到 non-nil error,因此需要手动调用 streaming.FinishStream(st, err)
来产生该 stream 对应的 RPCFinish
事件st.Close()
或向 server 发送先前约定好的消息。无论哪种情况,都不要忘记调用 streaming.FinishStream