Eino: Indexer 使用说明
基本介绍
Indexer 组件是一个用于存储和索引文档的组件。它的主要作用是将文档及其向量表示存储到后端存储系统中,并提供高效的检索能力。这个组件在以下场景中发挥重要作用:
- 构建向量数据库,以用于语义关联搜索
组件定义
接口定义
type Indexer interface {
Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error)
}
Store 方法
功能:存储文档并建立索引
参数:
- ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Manager
- docs:待存储的文档列表
- opts:存储选项,用于配置存储行为
返回值:
- ids:存储成功的文档 ID 列表
- error:存储过程中的错误信息
公共 Option
Indexer 组件使用 IndexerOption 来定义可选参数,Indexer 定义了如下的公共 option。另外,每个具体的实现可以定义自己的特定 Option,通过 WrapIndexerImplSpecificOptFn 函数包装成统一的 IndexerOption 类型。
type Options struct {
// SubIndexes 是要建立索引的子索引列表
SubIndexes []string
// Embedding 是用于生成文档向量的组件
Embedding embedding.Embedder
}
可以通过以下方式设置选项:
// 设置子索引
WithSubIndexes(subIndexes []string) Option
// 设置向量生成组件
WithEmbedding(emb embedding.Embedder) Option
使用方式
单独使用
// 初始化 indexer (以 vikingdb 为例)
indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{
// 配置参数
})
if err != nil {
return err
}
// 存储文档
ids, err := indexer.Store(ctx, docs,
indexer.WithSubIndexes([]string{"knowledge_base_1"}),
indexer.WithEmbedding(embedder),
)
if err != nil {
return err
}
在编排中使用
// 在 Chain 中使用
chain := compose.NewChain[[]*schema.Document, []string]()
chain.AppendIndexer(indexer)
// 编译并运行
runnable, err := chain.Compile()
if err != nil {
return err
}
result, err := runnable.Invoke(ctx, docs)
// 在 Graph 中使用
graph := compose.NewGraph[[]*schema.Document, []string]()
graph.AddIndexerNode("indexer_node", indexer)
Option 和 Callback 使用
Option 使用示例
// 使用选项 (单独使用时)
ids, err := indexer.Store(ctx, docs,
// 设置子索引
indexer.WithSubIndexes([]string{"kb_1", "kb_2"}),
// 设置向量生成组件
indexer.WithEmbedding(embedder),
)
Callback 使用示例
// 创建 callback handler
handler := &indexer.CallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *indexer.CallbackInput) context.Context {
fmt.Printf("开始存储文档,文档数量: %d\n", len(input.Docs))
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *indexer.CallbackOutput) context.Context {
fmt.Printf("文档存储完成,成功存储数量: %d\n", len(output.IDs))
return ctx
},
}
// 使用 callback handler
helper := template.NewHandlerHelper().
Indexer(handler).
Handler()
// 在运行时使用
runnable, err := chain.Compile()
if err != nil {
return err
}
result, err := runnable.Invoke(ctx, docs, compose.WithCallbacks(helper))
已有实现
- Volc VikingDB Indexer: 基于火山引擎 VikingDB 实现的向量数据库索引器 Indexer - VikingDB
- ByteES Indexer: 基于字节内部 ES 服务实现的全文索引器
自行实现参考
实现自定义的 Indexer 组件时,需要注意以下几点:
- 注意对公共 option 的处理以及组件实现级的 option 处理
- 注意对 callback 的处理
Option 机制
自定义 Indexer 可根据需要实现自己的 Option:
// 定义 Option 结构体
type MyIndexerOptions struct {
BatchSize int
MaxRetries int
}
// 定义 Option 函数
func WithBatchSize(size int) indexer.Option {
return indexer.WrapIndexerImplSpecificOptFn(func(o *MyIndexerOptions) {
o.BatchSize = size
})
}
Callback 处理
Indexer 实现需要在适当的时机触发回调。框架已经定义了标准的回调输入输出结构体:
// CallbackInput 是 indexer 回调的输入
type CallbackInput struct {
// Docs 是待索引的文档列表
Docs []*schema.Document
// Extra 是回调的额外信息
Extra map[string]any
}
// CallbackOutput 是 indexer 回调的输出
type CallbackOutput struct {
// IDs 是索引器返回的文档 ID 列表
IDs []string
// Extra 是回调的额外信息
Extra map[string]any
}
完整实现示例
type MyIndexer struct {
batchSize int
embedder embedding.Embedder
}
func NewMyIndexer(config *MyIndexerConfig) (*MyIndexer, error) {
return &MyIndexer{
batchSize: config.DefaultBatchSize,
embedder: config.DefaultEmbedder,
}, nil
}
func (i *MyIndexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) ([]string, error) {
// 1. 处理选项
options := &indexer.Options{},
options = indexer.GetCommonOptions(options, opts...)
// 2. 获取 callback manager
cm := callbacks.ManagerFromContext(ctx)
// 3. 开始存储前的回调
ctx = cm.OnStart(ctx, info, &indexer.CallbackInput{
Docs: docs,
})
// 4. 执行存储逻辑
ids, err := i.doStore(ctx, docs, options)
// 5. 处理错误和完成回调
if err != nil {
ctx = cm.OnError(ctx, info, err)
return nil, err
}
ctx = cm.OnEnd(ctx, info, &indexer.CallbackOutput{
IDs: ids,
})
return ids, nil
}
func (i *MyIndexer) doStore(ctx context.Context, docs []*schema.Document, opts *indexer.Options) ([]string, error) {
// 实现文档存储逻辑 (注意处理公共option的参数)
// 1. 如果设置了 Embedding 组件,生成文档的向量表示
if opts.Embedding != nil {
// 提取文档内容
texts := make([]string, len(docs))
for j, doc := range docs {
texts[j] = doc.Content
}
// 生成向量
vectors, err := opts.Embedding.EmbedStrings(ctx, texts)
if err != nil {
return nil, err
}
// 将向量存储到文档的 MetaData 中
for j, doc := range docs {
doc.WithVector(vectors[j])
}
}
// 2. 其他自定义逻辑
return ids, nil
}
最后修改
January 14, 2025
: Update release-v0_12_0.md (#1204) (5230605)