Eino: Callback 用户手册

解决的问题

Component(包括 Lambda)、Graph 编排共同解决“把业务逻辑定义出来”的问题。而 logging, tracing, metrics, 上屏展示等横切面性质的功能,需要有机制把功能注入到 Component(包括 Lambda)、Graph 中。

另一方面,用户可能想拿到某个具体 Component 实现的执行过程中的中间信息,比如 VikingDBRetriever 额外给出查询的 DB Name,ArkChatModel 额外给出请求的 temperature 等参数。需要有机制把中间状态透出。

Callbacks 支持“横切面功能注入”和“中间状态透出”,具体是:用户提供、注册“function”(Callback Handler),Component 和 Graph 在固定的“时机”(或者说切面、位点)回调这些 function,给出对应的信息。

核心概念

核心概念串起来,就是:Eino 中的 Component 和 Graph 等实体,在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。

触发实体

Component(包括官方定义的组件类型和 Lambda),Graph Node(以及 Chain Node),Graph 自身(以及 Chain)。这三类实体,都有横切面功能注入、中间状态透出的需求,因此都会触发 callback。具体见下面的“触发方式”一节。

触发时机

// CallbackTiming enumerates all the timing of callback aspects.
type CallbackTiming = callbacks.CallbackTiming

const (
    TimingOnStart CallbackTiming = iota // 进入并开始执行
    TimingOnEnd // 成功完成即将 return
    TimingOnError // 失败并即将 return err 
    TimingOnStartWithStreamInput // OnStart,但是输入是 StreamReader
    TimingOnEndWithStreamOutput // OnEnd,但是输出是 StreamReader
)

不同的触发实体,在不同场景下,是触发 OnStart 还是 OnStartWithStreamInput (OnEnd/OnEndWithStreamOutput 同理),具体的规则,详见下面的“触发方式”一节。

Callback Handler

type Handler interface {
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
    OnStartWithStreamInput(ctx context.Context, info *RunInfo,
       input *schema.StreamReader[CallbackInput]) context.Context
    OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
       output *schema.StreamReader[CallbackOutput]) context.Context
}

一个 Handler 是一个实现了上面 5 个方法(对应 5 个触发时机)的结构体。每个方法都会接收三个信息:

  • Context: 用于接收同一个 Handler 的前序触发时机可能设置的定制信息。
  • RunInfo: 触发回调的实体元信息。
  • Input/Output/InputStream/OutputStream: 触发回调时的业务信息。

并都会返回新的 Context:用于同一个 Handler 的不同触发时机之间传递信息。

如果一个 Handler,不想关注所有的 5 个触发时机,只想关注一部分,比如只关注 OnStart,建议使用 NewHandlerBuilder().OnStartFn(...).Build()。如果不想关注所有的组件类型,只想关注特定组件,比如 ChatModel,建议使用 NewHandlerHelper().ChatModel(...).Handler(),可以只接收 ChatModel 的回调并拿到一个具体类型的 CallbackInput/CallbackOutput。具体见“Handler 实现方式”一节。

不同 Handler 之间,触发顺序没有保证。

RunInfo

描述了触发 Callback 的实体自身的元信息。

// RunInfo contains information about the running component that triggers callbacks.
type RunInfo struct {
    Name      string               // the 'Name' with semantic meaning for the running component, specified by end-user
    Type      string               // the specific implementation 'Type' of the component, e.g. 'OpenAI'
    Component components.Component // the component abstract type, e.g. 'ChatModel'
}
  • Name:有业务含义的名称,需用户指定,不指定就是空字符串。对不同的触发实体:
    • Component:在 Graph 中时,用 Node Name。在 Graph 外单独的使用时,用户手动设置。详见“注入 RunInfo” 和 “单独使用 Component”
    • Graph Node:用 Node Name func WithNodeName(n string) GraphAddNodeOpt
    • Graph 自身:
      • 顶层图用 Graph Name func WithGraphName(graphName string) GraphCompileOption
      • 内部嵌套图,会用加入到上级图时添加的 Node Name
  • Type:组件具体实现来规定:
    • 有接口的 Component:如果实现了 Typer 接口,用 GetType() 方法的结果。否则用反射获取 Struct/Func 名。
    • Lambda:如果用 func WithLambdaType(t string) LambdaOpt 指定了 Type,用这个,否则是空字符串。
    • Graph Node:用内部 Component/Lambda/Graph 的值。
    • Graph 自身:空字符串。
  • Component:
    • 有接口的 Component:是啥接口,就是啥
    • Lambda:固定值 Lambda
    • Graph Node: 用内部的 Component/Lambda/Graph 的值。
    • Graph 自身:固定值 Graph / Chain. (之前曾有 StateGraph / StateChain ,现已整合到 Graph / Chain 中)

Callback Input & Output

本质是任意类型,因为不同的 Component 的输入输出、内部状态完全不同。

type CallbackInput any
type CallbackOutput any

具体到某个组件,有更具体的类型,比如 Chat Model

// CallbackInput is the input for the model callback.
type CallbackInput struct {
    // Messages is the messages to be sent to the model.
    Messages []*schema.Message
    // Tools is the tools to be used in the model.
    Tools []*schema.ToolInfo
    // Config is the config for the model.
    Config *Config
    // Extra is the extra information for the callback.
    Extra map[string]any
}

// CallbackOutput is the output for the model callback.
type CallbackOutput struct {
    // Message is the message generated by the model.
    Message *schema.Message
    // Config is the config for the model.
    Config *Config
    // TokenUsage is the token usage of this request.
    TokenUsage *TokenUsage
    // Extra is the extra information for the callback.
    Extra map[string]any
}

在 Chat Model 的具体实现,比如 OpenAI Chat Model 中,建议组件作者向 Callback Handler 中传入具体的 Input/Output 类型,而不是 Any。这样可以透出更具体的、定制化的中间状态信息。

如果是 Graph Node 来触发 Callback,因为 Node 拿不到组件内部中间状态信息,只能拿到组件接口中规定的输入和输出,所以给 Callback Handler 的只能是这些。对 Chat Model,就是 []*schema.Message 和 *schema.Message。

Graph 自身触发 Callback 时,输入输出就是 Graph 整体的输入和输出。

注入 Handler

Handler 需要注入到 Context 中才能被触发。

全局注入 Handler

通过 callbacks.InitCallbackHandlers 注入全局的 Handler。注入后,所有的触发回调行为,都会自动触发这些全局的 Handler。典型的场景是 tracing,logging 等全局一致、业务场景无关的功能。

不是并发安全的。建议在服务初始化时注入一次。

向 Graph 中注入 Handler

通过 compose.WithCallbacks 在 graph 运行时注入 Handler,这些 Handler 会在 graph 的本次运行整体上生效,包括 Graph 内各 Node 和 Graph 自身(以及各内嵌的 graph)。

通过 compose.WithCallbacks(...).DesignateNode(...) 向顶层 Graph 的某个 Node 注入 Handler。当这个 Node 自身是个内嵌的 Graph 时,会注入到这个内嵌 Graph 自身和其内部的各 Node。

通过 compose.WithCallbacks(...).DesignateNodeForPath(...) 向内部嵌套的 Graph 的某个 Node 注入 Handler。

在 Graph 外注入 Handler

不想使用 Graph,但却想使用 Callback,则:

通过 InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) 获取一个新的 Context 并注入 Handlers 以及 RunInfo。

Handler 继承

与子 Context 继承父 Context 中的所有 Values 相同,子 Context 也会继承父 Context 中的所有 Handlers。举个例子,Graph 运行时传入的 Context 中如果已经有了 Handler,则这些 Handlers 都会被整个 Graph 的这次运行继承和生效。

注入 RunInfo

RunInfo 也需要注入到 Context 中,才会在触发回调时给到 Handler。

Graph 托管 RunInfo

Graph 会为内部所有的 Node 自动注入 RunInfo。机制是每个 Node 的运行,都是一个新的子 Context,Graph 向这个新的 Context 中注入对应 Node 的 RunInfo。

在 Graph 外注入 RunInfo

不想使用 Graph,但却想使用 Callback,则:

通过 InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) 获取一个新的 Context 并注入 Handlers 以及 RunInfo。

通过 ReuseHandlers(ctx context.Context, info *RunInfo) 来获取一个新的 Context,复用之前 Context 中的 Handler,并设置新的 RunInfo。

触发方式

组件实现内部触发(Component Callback)

在组件实现的代码中,调用 callbacks 包中的 OnStart(), OnEnd(), OnError(), OnStartWithStreamInput(), OnEndWithStreamInput()。以 Ark 的 ChatModel 实现为例,在 Generate 方法中:

func (cm *ChatModel) Generate(ctx context.Context, in []*schema.Message, opts ...fmodel.Option) (
    outMsg *schema.Message, err error) {

    defer func() {
       if err != nil {
          _ = callbacks.OnError(ctx, err)
       }
    }()

    // omit multiple lines... instantiate req conf
        
    ctx = callbacks.OnStart(ctx, &fmodel.CallbackInput{
       Messages:   in,
       Tools:      append(cm.rawTools), // join tool info from call options
       ToolChoice: nil,                 // not support in api
       Config:     reqConf,
    })

    // omit multiple lines... invoke Ark chat API and get the response
    
    _ = callbacks.OnEnd(ctx, &fmodel.CallbackOutput{
       Message:    outMsg,
       Config:     reqConf,
       TokenUsage: toModelCallbackUsage(outMsg.ResponseMeta),
    })

    return outMsg, nil
}

在 Stream 方法中:

func (cm *ChatModel) Stream(ctx context.Context, in []*schema.Message, opts ...fmodel.Option) ( // byted_s_too_many_lines_in_func
    outStream *schema.StreamReader[*schema.Message], err error) {

    defer func() {
       if err != nil {
          _ = callbacks.OnError(ctx, err)
       }
    }()
    
    // omit multiple lines... instantiate req conf

    ctx = callbacks.OnStart(ctx, &fmodel.CallbackInput{
       Messages:   in,
       Tools:      append(cm.rawTools), // join tool info from call options
       ToolChoice: nil,                 // not support in api
       Config:     reqConf,
    })
    
    // omit multiple lines... make request to Ark API and convert response stream to StreamReader[model.*CallbackOutput]

    _, sr = callbacks.OnEndWithStreamOutput(ctx, sr)

    return schema.StreamReaderWithConvert(sr,
       func(src *fmodel.CallbackOutput) (*schema.Message, error) {
          if src.Message == nil {
             return nil, schema.ErrNoValue
          }

          return src.Message, nil
       },
    ), nil
}

可以看到 Generate 调用时,触发的是 OnEnd,而 Stream 调用时,触发的是 OneEndWithStreamOutput:

组件实现内部触发 Callbacks 时:

  • 当组件输入为 StreamReader 时,触发 OnStartWithStreamInput,否则触发 OnStart
  • 当组件输出为 StreamReader 时,触发 OnEndWithStreamOutput,否则触发 OnEnd

内部实现了 callback 触发的组件,应当实现 Checker 接口,IsCallbacksEnabled 返回 true,向外部传达“我内部实现了 callback 触发”的信息:

// Checker tells callback aspect status of component's implementation
// When the Checker interface is implemented and returns true, the framework will not start the default aspect.
// Instead, the component will decide the callback execution location and the information to be injected.
type Checker interface {
    IsCallbacksEnabled() bool
}

如果一个组件实现,没有实现 Checker 接口,或者 IsCallbacksEnabled 返回 false,可以认为该组件内部没有触发回调,需要 Graph Node 来负责注入和触发(在 Graph 内使用时)。

Graph Node 触发(Node Callback)

当一个 Component 被编排入 Graph 时,成为一个 Node。这时,如果 Component 自身会触发 callback,Node 就复用 Component 的 callback 处理。否则,Node 会在 Component 外面埋上 callback handler 触发点位。这些点位与 Component 自身的流式范式对应。比如一个 ChatModelNode,会在 Generate 方法外面埋上 OnStart/OnEnd/OnError,同时会在 Stream 方法外面埋上 OnStart/OnEndWithStreamOutput/OnError。

在 Graph 运行时,各组件会以 Invoke 或 Transform 范式运行,又会根据组件具体实现的业务流式范式,调用对应的组件方法。比如 Graph 以 Invoke 运行,Chat Model Node 会以 Invoke 运行,调用 Generate 方法。而当 Graph 以 Stream 运行,Chat Model Node 会以 Transform 运行,但 Chat Model 的业务流式范式中没有 Transform,会自动降级成调用 Stream 方法。因此:

Graph Node 具体触发哪个位点(OnStart 还是 OnStartWithStreamInput),取决于组件实现的业务流式范式和 Graph 运行方式两个因素。

关于 Eino 流式编程的详细介绍,参见 Eino 流式编程要点

Graph 自身触发(Graph Callback)

Graph 在自身的开始、结束、err 的时机触发 Callback Handler。如果 Graph 以 Invoke 形式调用,触发 OnStart/OnEnd/OnError。如果以 Stream/Collect/Transform 形式调用,触发 OnStartWithStreamInput/OnEndWithStreamOutput/OnError。这是因为 Graph 内部会始终以 Invoke 或 Transform 执行。参见 Eino 流式编程要点

值得注意的是:graph 也是 component 的一种,因此 graph callback 也是 component callback 的一种特殊形式。根据 Node Callback 的定义,当 Node 内部的 component 实现了对触发时机的感知和处理时,Node 会直接复用 Component 的实现,不会再实现 Node Callback。这意味着当一个 graph 通过 AddGraphNode 的方式加入到另外一个 Graph 中作为一个 Node 时,这个 Node 会复用内部 graph 的 graph callback。

解析 Callback Input & Output

从上文得知,Callback Input & Output 的底层是 Any,只是不同组件类型在具体触发回调时,可能会传入自己特定的类型。并且 Callback Handler 的接口定义中,各方法的入参也是 Any 类型的 Callback Input & Output。

因此,具体的 Handler 实现中,需要做两个事情:

  1. 根据 RunInfo 判断当前触发回调的是哪个组件类型,比如 RunInfo.Component == “ChatModel”,或者 RunInfo.Type == “xxx Chat Model”。
  2. 把 any 类型的 Callback Input & Output 转成对应的具体类型,以 RunInfo.Component == “ChatModel” 为例:
// ConvCallbackInput converts the callback input to the model callback input.
func ConvCallbackInput(src callbacks.CallbackInput) *CallbackInput {
    switch t := src.(type) {
    case *CallbackInput: // when callback is triggered within component implementation, the input is usually already a typed *model.CallbackInput
       return t
    case []*schema.Message: // when callback is injected by graph node, not the component implementation itself, the input is the input of Chat Model interface, which is []*schema.Message
       return &CallbackInput{
          Messages: t,
       }
    default:
       return nil
    }
}

// ConvCallbackOutput converts the callback output to the model callback output.
func ConvCallbackOutput(src callbacks.CallbackOutput) *CallbackOutput {
    switch t := src.(type) {
    case *CallbackOutput: // when callback is triggered within component implementation, the output is usually already a typed *model.CallbackOutput
       return t
    case *schema.Message: // when callback is injected by graph node, not the component implementation itself, the output is the output of Chat Model interface, which is *schema.Message
       return &CallbackOutput{
          Message: t,
       }
    default:
       return nil
    }
}

如果 Handler 里面需要增加 switch case 来判断 RunInfo.Component,并且对每一个 case,需要调对应的转换函数把 Any 转成具体类型,确实有些复杂。为了减少写胶水代码的重复劳动,我们提供了两种实现 Handler 的便捷工具函数。

Handler 实现方式

除了直接实现 Handler 接口外,Eino 提供了两种 Handler 的便捷实现工具。

HandlerHelper

如果用户的 Handler 只关注特定类型的组件,比如 ReactAgent 的场景,只关注 ChatModel 和 Tool,建议使用 HandlerHelper 来快速创建具体类型的 Callback Handler:

handler := NewHandlerHelper().ChatModel(modelHandler).Tool(toolHandler).Handler()

其中 modelHandler 是 Chat Model 组件对 callback handler 的进一步封装:

// ModelCallbackHandler is the handler for the model callback.
type ModelCallbackHandler struct {
    OnStart               func(ctx context.Context, runInfo *callbacks.RunInfo, input *model.CallbackInput) context.Context
    OnEnd                 func(ctx context.Context, runInfo *callbacks.RunInfo, output *model.CallbackOutput) context.Context
    OnEndWithStreamOutput func(ctx context.Context, runInfo *callbacks.RunInfo, output *schema.StreamReader[*model.CallbackOutput]) context.Context
    OnError               func(ctx context.Context, runInfo *callbacks.RunInfo, err error) context.Context
}

上面的 ModelCallbackHandler,封装了三个操作:

  1. 不再需要判断 RunInfo.Component 来选择属于 ChatModel 触发的回调,而是已经自动做了过滤。
  2. 只要求实现 Chat Model 这个组件支持的触发时机,这里去掉了不支持的 OnStartWithStreamInput。同时,如果用户只关注 Chat Model 支持的四个时机的某几个,比如只有 OnStart,也可以只实现 OnStart。
  3. Input / Output 不再是 Any 类型,而是已经转化好的 model.CallbackInput, model.CallbackOutput。

HandlerHelper 支持全部的官方组件,目前的列表是:ChatModel, ChatTemplate, Retriever, Indexer, Embedding, Document.Loader, Document.Transformer, Tool, ToolsNode.

针对 Lambda,Graph,Chain 这些输入输出类型不确定的“组件”,也可以使用 HandlerHelper,但是只能做到上面的第 1 点,即按照组件类型做自动的过滤,2、3 点依然需要用户自己实现:

handler := NewHandlerHelper().Lambda(callbacks.Handler).Graph(callbacks.Handler)...Handler()

这时,NewHandlerHelper().Lambda() 需要传入 callbacks.Handler 可以用下面的 HandlerBuilder 来实现。

HandlerBuilder

如果用户的 Handler 需要关注多个组件类型,但却只需要关注部分的触发时机,可以使用 HandlerBuilder:

handler := NewHandlerBuilder().OnStartFn(fn)...Build()

最佳实践

在 Graph 中使用

  • 积极使用 Global Handlers,注册始终生效的 Handlers。
  • 通过 WithHandlers option 在运行时注入 Handler,通过 DesignateNode 或 DesignateNodeByPath 指定生效的 Node / 嵌套的内部 Graph / 内部 Graph 的 Node。

在 Graph 外使用

依然可以积极使用 Global Handlers。但需要在调用 InitCallbacks 后 global handlers 才会生效。InitCallbacks 的入参中不需要传入 Global Handlers,会自动注入。

需要注意的是,如果在 Graph 外使用的 Component,内部并没有实现

单个 Component

使用 InitCallbacks 注入 RunInfo 和 Handlers。RunInfo 的各字段需自行设置。

多个 component 并列

在每个并列的 component 执行前,分别调用 InitCallbacks 注入各自的 RunInfo 和 Handlers。注意:

  • 多次调用 InitCallbacks,传入的 Context 应当相同,因为各组件是并列关系
  • 每次调用 InitCallbacks,返回的 Context,应当传入对应的 Component 内,但不应当传入其他的 Component 内。

多个 component 嵌套

在顶层 Component 执行前,调用 InitCallbacks 注入 RunInfo 和 Handlers,并把返回的 Context 传入顶层 Component 内。

在内部 Component 执行前,分情况讨论:

  • 如果 Handlers 与顶层 Component 相同,调用 ReuseHandlers,注入新的 RunInfo,并把返回的 Context 传入内部 Component 中。
  • 如果 Handlers 与顶层 Component 不同,调用 InitCallbacks,注入新的 RunInfo 和新的(全量)Handlers,并把返回的 Context 传入内部 Component 中。

Handler 内读写 input & output

Handler 内不建议修改 input / output。原因是:

  • callbacks 类似单向的信息传递通道,component -> Handler。
  • 对 input / output 的修改,如果多个 handler 同时(异步)进行,有并发问题;如果先 copy 再修改,则失去了修改的意义,因为修改内容 component 内不可见。

在 Handler 内不能写 input / output 的前提下,一般情况下,在 Handler 内读 input / output 是并发安全的。一个特殊情况是上下游节点之间以 StreamReader 形式传递数据时,可能上游的 OnEndWithStreamOutput 还在异步处理流,下游的节点内部已经开始处理业务,这是 Handler 的异步处理与节点内部处理是并发的。这带来一个额外的要求:组件实现内部,也不建议直接修改输入流中的数据,如果有修改后继续向下游输出的需求,需要先 copy,修改 copy 后的数据,再向下传递。

在满足这两个条件后,读就变成并发安全的了,因此 Handler 可以直接同步或异步读取 input / output 内的内容。

总结起来:无论是组件内部还是 Handler 内部,都不建议直接修改输入的业务信息。

Handler 间传递信息

同一个 Handler 的不同时机之间,可通过 ctx 传递信息,如 OnStart 中通过 context.WithValue 返回一个新的 context,在 OnEnd 中从 context 中再取出这个 value。

不同 Handler 之间,没有执行顺序的保证,因此不建议通过上面的机制在不同 Handler 间传递信息。本质上是无法保证某一个 Handler 返回的 context,一定会进入下一个 Handler 的函数执行中。

如果需要在不同 Handler 之间传递信息,建议的方式是在最外层的 context(如 graph 执行时传入的 context)中,设置一个全局的、请求维度的变量作为公共信息的存取空间,在各个 Handler 中按需读取和更新这个公共变量。在有 stream 的情况下,可能需要格外注意和保证这个公共变量的并发安全。

流切记要 Close

以存在 ChatModel 这种具有真流输出的节点为例,当存在 Callback 切面时,ChatModel 的输出流:

  • 既要被下游节点作为输入来消费,又要被 Callback 切面来消费
  • 一个流中的一个帧(Chunk),只能被一个消费方消费到,即流不是广播模型

所以此时需要将流进行复制,其复制关系如下:

  • 如果其中一个 Callback n 没有 Close 对应的流,可能导致原始 Stream 无法 Close 和释放资源。

最后修改 January 22, 2025 : docs: update eino doc (#1215) (805b4b6)