Eino: 公共切面 - Callbacks
切面
Eino 切面(Eino Callback)是 Eino 框架对开发提供的扩展 Eino 框架,丰富横向治理的能力
可以从以下几个方面,认识 Eino 切面:
- 切面位置:即能在哪些点位添加切面逻辑
- 切面注入:可通过哪些方式,注入对应的切面逻辑
- 切面角色
- 切面机制:由 Eino 框架提供了,扩展 Eino 自身功能的机制
- 切面扩展者:基于 Eino 的扩展能力,设计和提供各种各样的独立于 Graph 执行的扩展能力
- 例如:Langfuse Trace 等
- 切面使用者:真正使用切面扩展能力的终端业务
- 例如:针对业务编排的 Graph 图,添加 Langfuse Trace 切面能力。方便对编排产物的执行进行观测
切面位置
- 组件切面(Component Callback):组件自带的切面
- 实现在组件内部的切面执行点位,而不是在加入到 Node 时,由 Node 在组件之外添加的切面点位
- 组件切面既可应用于 Graph 编排场景,也可应用于组件的独立使用场景
- 组件切面、节点切面一般二选一。
- 当组件声明自己提供组件切面时,节点切面会被自动关闭。
- 节点切面(Node Callback):组件加入到节点时,由 Node 在组件之外添加的切面点位
- 由于是在组件之外,只能见到组件的 input/output,无法感知组件运行时的内部状态
- 仅可应用于 Graph 编排场景,在组件独立使用时,无法生效
- 图切面(Graph Callback):把整张图视为一个整体,在图的前后添加的切面点位
切面生效位置有三种:
- 所有图的 Graph Callback 和 Node Callback 生效
- 指定图及其嵌套子图的 Graph Callback 和 Node Callback 生效
- 指定图的指定节点的 Node Callback 生效
切面注入
切面的注入方式,有以下三种:
- 全局注入:对所有图的所有节点生效
- 请求粒度注入:在请求时注入,仅对本次请求所经过的节点均生效
- 组件实例注入:针对实现了 Component Callback 的组件,可针对该组件实例,单独注入切面
注意:不同的注入时机,其对应的 Callback 的生效位置有所不同
全局注入(进程粒度)
- 生效位置: 所有图的所有节点或组件的 Callback 均生效 + 所有图的 Graph Callback 生效
- 针对节点、组件哪一个生效的问题,取决于组件是否声明自己实现了 Component Callback。若声明,则仅 Component Callback 生效;若未声明,则 Node Callback 生效
- 生效时机: Graph 编译产物的任意一次执行
- 注入方式:
- 通过 callbacks.InitCallbackHandlers() 注入。一个进程中,仅最后一次调用生效。建议放在 main 入口函数中进行初始化
import "github.com/cloudwego/eino/callbacks"
func main() {
// 设置全局生效的 Callback Handlers。
// 一个进程内仅最后一次调用的 callbacks.InitCallbackHandlers() 会生效
callbacks.InitCallbackHandlers([]callbacks.Handler{&loggerCallbacks{}})
}
type loggerCallbacks struct{
*callbacks.HandlerBuilder
}
func (l *loggerCallbacks) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
logs.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}
func (l *loggerCallbacks) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
logs.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}
请求粒度注入
- 生效的切面位置: 本地调用涉及图及其嵌套子图的 Graph Callback 和 Node Callback
- 生效时机: Graph 编译产物的本次调用
- 注入时机:
- 调用 Graph 编译产物 Runnable 方法时,例如调用 Invoke()、Stream()方法,通过 Graph Call Option 传入。
- GraphCallOption:compose.WithCallbacks(handler)
Graph 示例:
package main
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func main() {
ctx := context.Background()
// create an instance of your implementation of callbacks.Handler
handler := &callbacks.HandlerBuilder{}
// create an instance of Graph
// input type is 1st Graph Node's input type, that is ChatTemplate's input type: map[string]any
// output type is last Graph Node's output type, that is ToolsNode's output type: []*schema.Message
g := compose.NewGraph[[]*schema.Message, *schema.Message]()
// add node and edge here
// err = g.AddChatModelNode("chat_model", chatTpl)
// if err != nil {
// logs.Errorf("AddChatTemplateNode failed, err=%v", err)
// return
// }
// compile Graph[I, O] to Runnable[I, O]
r, err := g.Compile()
if err != nil {
logs.Errorf("Compile failed, err=%v", err)
return
}
_, err = r.Invoke(ctx,
[]*schema.Message{
schema.SystemMessage("you are a helpful assistant"),
schema.UserMessage("我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产"),
},
compose.WithCallbacks(handler),
)
_, err = r.Stream(ctx, []*schema.Message{
schema.SystemMessage("you are a helpful assistant"),
schema.UserMessage("我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产"),
},
compose.WithCallbacks(handler),
)
}
Chain 示例:
使用方式和 Graph 是一模一样的
package main
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func main() {
ctx := context.Background()
// create an instance of your implementation of callbacks.Handler
handler := &callbacks.HandlerBuilder{}
// create an instance of Graph
// input type is 1st Graph Node's input type, that is ChatTemplate's input type: map[string]any
// output type is last Graph Node's output type, that is ToolsNode's output type: []*schema.Message
ch := compose.NewChain[[]*schema.Message, *schema.Message]()
// append node here
// err = ch.AppendChatModel(chatTpl)
// if err != nil {
// logs.Errorf("AddChatTemplateNode failed, err=%v", err)
// return
// }
// compile Graph[I, O] to Runnable[I, O]
r, err := ch.Compile()
if err != nil {
logs.Errorf("Compile failed, err=%v", err)
return
}
_, err = r.Invoke(ctx,
[]*schema.Message{
schema.SystemMessage("you are a helpful assistant"),
schema.UserMessage("我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产"),
},
compose.WithCallbacks(handler),
)
_, err = r.Stream(ctx, []*schema.Message{
schema.SystemMessage("you are a helpful assistant"),
schema.UserMessage("我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产"),
},
compose.WithCallbacks(handler),
)
}
定制切面
从定制切面的场景出发,会有两种定制需求:
- 切面数据消费:消费各切面点位上报的切面信息,实现业务定制化的横向治理能力
- 组件切面上报:实现一个组件时,定制切面上报的点位,定制上报的切面数据内容
切面数据消费
- 切面数据的消费接口定义
- CallbackInput 在接口定义中是一个 any, 推荐采用组件抽象定义的结构体进行上报。以 model 组件为例,推荐使用 model.CallbackInput{}
- CallbackOutput 同 CallbackInput,推荐采用组件抽象定义的结构体上报。
- 采用推荐的预定义的结构体,有利于切面消费数据时,解析和理解数据内容
// RunInfo is the info of run node.
type RunInfo struct {
Name string
Type string
Component components.Component
}
// CallbackInput is the input of the callback.
// the type of input is defined by the component.
// using type Assert or convert func to convert the input to the right type you want.
// e.g.
//
// CallbackInput in components/model/interface.go is:
// type CallbackInput struct {
// Messages []*schema.Message
// Config *Config
// Extra map[string]any
// }
//
// and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput
// in callback handler, you can use the following code to get the input:
//
// modelCallbackInput := model.ConvCallbackInput(in)
// if modelCallbackInput == nil {
// // is not a model callback input, just ignore it
// return
// }
type CallbackInput any
type CallbackOutput any
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
output *schema.StreamReader[CallbackOutput]) context.Context
}
- 定义一个结构体,实现上面的 Handler 接口
- WARN**:OnStartWithStreamInput、OnEndWithStreamOutput 中的两个 input/output 流必须要要关闭,否则会导致流无法关闭回收,从而导致 Goroutine 或内存缓慢泄露。 **
- 即在这两个函数中一定要调用:input.Close()、output.Close()
- 考虑到用户可能仅消费部分接口,并且对应的两个流式接口又必须要求 Close(),推荐采用实例化 callbacks.HandlerBuilder{} 的方式,可选实现其中的某几个 OnXXXFn 字段
- WARN**:OnStartWithStreamInput、OnEndWithStreamOutput 中的两个 input/output 流必须要要关闭,否则会导致流无法关闭回收,从而导致 Goroutine 或内存缓慢泄露。 **
import (
"context"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
)
var callbackHandler1 = &callbacks.HandlerBuilder{
OnErrorFn: func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
return ctx
},
OnStartFn: func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 以 ChatModel 为例,将 input 转换为 model.CallbackInput{}
in := model.ConvCallbackInput(info)
_ = in
return ctx
},
OnStartWithStreamInputFn: func(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
// 必须要关闭这个流,否则会导致 Goroutine 溢出
defer input.Close()
// implement
return ctx
},
OnEndWithStreamOutputFn: func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
// 必须要关闭这个流,否则会导致 Goroutine 溢出
defer output.Close()
// implement
return ctx
},
}
- 按照 切面注入 章节,选择合适的方式,将定制的消费切面注入到 Graph/Chain 即可
WARN:Callback 流切记要 Close
以存在 ChatModel 这种具有真流输出的节点为例,当存在 Callback 切面时,ChatModel 的输出流:
- 既要被下游节点作为输入来消费,又要被 Callback 切面来消费
- 一个流中的一个帧(Chunk),只能被一个消费方消费到,即流不是广播模型
所以此时需要将流进行复制,其复制关系如下:
- 如果其中一个 Callback n 没有 Close 对应的流,因此 Stream Coper 可能一直阻塞生产,无法退出,从而导致 Stream Coper 的资源无法及时释放。
组件切面上报
当用户定制实现一个组件时,可能因为需要 定制切面上报点位、定制切面上报内容等原因,导致不使用 Node Callback,而是选择定制实现 Component Callback。
- Node Callback 仅能上报组件抽象的输入输出信息,无法获取更多的组件内部信息。
组件切面定制上报逻辑的示例,以 ChatModel 为例:
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"runtime/debug"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)
type ChatModel struct {}
func (cm *ChatModel) Generate(ctx context.Context, in []*schema.Message, opts ...model.Option) (
outMsg *schema.Message, err error) {
var (
// 从 ctx 中获取 Node 执行时,由 Eino 框架注入的 CallbackManager(cbm)
cbm, cbmOK = callbacks.ManagerFromCtx(ctx)
)
defer func() {
// 如果 cbm 存在,并且产生错误,则在此处上报错误信息
if err != nil && cbmOK {
_ = cbm.OnError(ctx, err)
}
}()
// TODO: 在这里处理用户请求参数
// 如果 cbm 存在,则在组件逻辑真正执行前,上报请求信息
if cbmOK {
ctx = cbm.OnStart(ctx, &model.CallbackInput{
Messages: in,
Config: reqConf,
})
}
resp, err := cm.cli.CreateChatCompletion(ctx, *req)
if err != nil {
return nil, err
}
// TODO: 在这里处理响应信息,并实现转换
// 在组件逻辑执行结束后,上报组件的处理结果
if cbmOK {
_ = cbm.OnEnd(ctx, &model.CallbackOutput{
Message: outMsg,
Config: reqConf,
TokenUsage: usage,
})
}
return outMsg, nil
}
注:当涉及到流式数据的切面上报时,需要将原来的流,Copy 出两份,一份作为输出,一份留作为 Callback 消费(CallbackManager 会根据 Callback Handler 的数量再次进行 Copy)。 针对流的相关操作,可参考 Stream 流 章节
常见问题
- 调试环境问题
- 采样率问题
- 为什么出现了 goroutine 泄露?
最后修改
January 17, 2025
: fix: broken url and non-proper info (#1212) (eea25ec)