Eino: Flow 集成组件
大模型应用是存在通用场景和模式的,若把这些场景进行抽象,就能提供一些可以帮助开发者快速构建大模型应用的模版。Eino 的 Flow 模块就是在做这件事。
目前 Eino 已经集成了 react agent
、host multi agent
两个常用的 Agent 模式,以及 MultiQueryRetriever, ParentIndexer 等。
- React Agent: Eino: React Agent 使用手册
- Multi Agent: Eino Tutorial: Host Multi-Agent
Flow 进编排
Flow 集成组件自身一般是由一个或多个 graph 编排而成。同时,这些 flow 也可以作为节点进入其他 graph 的编排之中,方式有三种:
如果一个 flow 实现了某个组件的 interface,可用该组件对应的 AddXXXNode 等方法加入编排,如 multiquery retriever:
// instantiate the flow: multiquery.NewRetriever vk, err := newVikingDBRetriever(ctx, vikingDBHost, vikingDBRegion, vikingDBAK, vikingDBSK) if err != nil { logs.Errorf("newVikingDBRetriever failed, err=%v", err) return } llm, err := newChatModel(ctx, openAIBaseURL, openAIAPIKey, openAIModelName) if err != nil { logs.Errorf("newChatModel failed, err=%v", err) return } // rewrite query by llm mqr, err := multiquery.NewRetriever(ctx, &multiquery.Config{ RewriteLLM: llm, RewriteTemplate: nil, // use default QueryVar: "", // use default LLMOutputParser: nil, // use default MaxQueriesNum: 3, OrigRetriever: vk, FusionFunc: nil, // use default fusion, just deduplicate by doc id }) if err != nil { logs.Errorf("NewMultiQueryRetriever failed, err=%v", err) return } // add the flow to graph graph := compose.NewGraph[string, *schema.Message]() _ = graph.AddRetrieverNode("multi_query_retriever", mqr, compose.WithOutputKey("context")) _ = graph.AddEdge(compose._START_, "multi_query_retriever") _ = graph.AddChatTemplateNode("template", prompt.FromMessages(schema._FString_, schema.UserMessage("{context}"))) // ...
如果一个 flow 内部是由单个 graph 编排而成,且 flow 的功能可完全等价于这个 graph 的运行(没有不能转化成 graph run 的定制逻辑),则可以将该 flow 的 graph 导出,通过 AddGraphNode 等方法加入编排,如 ReAct Agent 和 Host Multi-Agent:
// instantiate the host multi-agent hostMA, err := NewMultiAgent(ctx, &MultiAgentConfig{ Host: Host{ ChatModel: mockHostLLM, }, Specialists: []*Specialist{ specialist1, specialist2, }, }) assert.Nil(t, err) // export graph and []GraphAddNodeOption from host multi-agent maGraph, opts := hostMA.ExportGraph() // add to another graph fullGraph, err := compose.NewChain[map[string]any, *schema.Message](). AppendChatTemplate(prompt.FromMessages(schema._FString_, schema.UserMessage("what's the capital city of {country_name}"))). AppendGraph(maGraph, append(opts, compose.WithNodeKey("host_ma_node"))...). Compile(ctx) assert.Nil(t, err) // invoke the other graph // convert the flow's own option to compose.Option if needed // assign options to flow's nodes if needed out, err := fullGraph.Invoke(ctx, map[string]any{"country_name": "China"}, compose.WithCallbacks(ConvertCallbackHandlers(mockCallback)). DesignateNodeWithPath(compose.NewNodePath("host_ma_node", hostMA.HostNodeKey())))
所有 flow 应当都可以封装成 Lambda,通过 AddLambdaNode 等方法加入编排。目前所有的 flow 都可以通过 1 或 2 加入编排,所以不需要降级到使用 Lambda。如果要用,使用姿势是:
// instantiate the flow a, err := NewAgent(ctx, &AgentConfig{ Model: cm, ToolsConfig: compose.ToolsNodeConfig{ Tools: []tool.BaseTool{fakeTool, &fakeStreamToolGreetForTest{}}, }, MaxStep: 40, }) assert.Nil(t, err) chain := compose.NewChain[[]*schema.Message, string]() // convert the flow to Lambda agentLambda, err := compose.AnyLambda(a.Generate, a.Stream, nil, nil) assert.Nil(t, err) // add lambda to another graph chain. AppendLambda(agentLambda). AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (string, error) { t.Log("got agent response: ", input.Content) return input.Content, nil })) r, err := chain.Compile(ctx) assert.Nil(t, err) // invoke the graph res, err := r.Invoke(ctx, []*schema.Message{{Role: schema._User_, Content: "hello"}}, compose.WithCallbacks(callbackForTest))
三个方法的对比如下:
方式 | 适用场景 | 优势 |
作为组件 | 需实现组件的 interface | 简单直接,语义清晰 |
作为 Graph | 由单个 graph 编排而成,功能不超出这个 graph 的范围 | graph 内节点对外层 graph 暴露,可统一分配运行时 option,相比 Lambda 少一层转化,可通过 GraphCompileCallback 获取上下级 graph 关系 |
作为 Lambda | 所有 | 普适 |
最后修改
February 21, 2025
: doc: add eino english docs (#1255) (4f6a3bd)