Eino: The design concept of orchestration
In the LLM application orchestration solutions, the most popular ones are langchain and langgraph, which officially provide SDKs for Python and TypeScript. These two languages are known for their flexibility, which brings great convenience to SDK development but also causes significant confusion and cognitive burden for SDK users.
As an extremely simple programming language, Golang’s defined static types
are one of the key reasons why it remains straightforward, and Eino maintains this critical characteristic: defined types
+ compile-time type checking
.
Basic Principle of Type Alignment
in Upstream and Downstream
The most fundamental orchestration method in eino is the graph, along with the simplified wrapped chain. Regardless of the orchestration method, it essentially consists of logical nodes
+ upstream and downstream relationships
. At runtime, the product of the orchestration starts from one logical node and then proceeds to run the next node that is connected to the current node.
This entails a basic assumption: the output value of the previous node can serve as the input value for the next node.
In Golang, there are two basic approaches to achieving this assumption:
- Convert the input and output of different nodes into a more generalized type, such as
any
ormap[string]any
.- Adopting the approach to generalize into any, but the corresponding cost is that developers need to explicitly convert it into a specific type when writing code to use it. This significantly increases the mental burden on developers, hence this approach was ultimately abandoned.
- The langchain approach can be seen as passing
map[string]any
throughout the process, where each logical node retrieves the corresponding value with the corresponding key based on its needs. In langchaingo’s implementation, this approach is adopted. However, in Golang, they still need to be used withtype assertions
. This approach still imposes a significant mental burden on the developer.
- Keep the input and output types of each node as expected by the developer, ensuring type consistency between upstream and downstream during the Compile phase.
Approach 2 is the final approach selected by eino. This approach is the easiest to understand during orchestration. The whole process is like building blocks
, where the protruding and recessed parts of each block have their own specifications, and only matching specifications can form upstream and downstream relationships.
As shown in the diagram below:
For orchestration, it can only run properly if the downstream can recognize and process the upstream output. This basic assumption is clearly expressed in eino, allowing developers to be confident and clear about how orchestration logic operates and flows when using eino, rather than guessing whether the passed values are correct from a series of any.
Type Alignment in Graph
Edge
In a graph, the output of a node will flow to the next node along an edge
, so the nodes connected by an edge must have type alignment.
As shown in the figure below:
This is a scenario simulating ① Direct conversation with a LLM ② Using RAG mode, and the final results can be used to compare the effects of the two modes
The green part in the figure represents the ordinary edge connection, which requires that the upstream output must be able to be assigned
downstream, and the types that can be accepted are:
① The same type for both upstream and downstream: For example, the upstream output is *schema.Message, and the downstream input is also *schema.Message.
② The downstream accepts an interface, and the upstream implements that interface: For example, the upstream structure implements the Format() interface, and the downstream accepts an interface{ Format() }. A special situation is when the downstream is any (empty interface), the upstream definitely implements any, so they can certainly connect.
③ The upstream is an interface, and the downstream is a specific type: When the downstream specific type implements the upstream’s interface type, it may or may not work, which cannot be determined at compile time, only at runtime when the explicit type of upstream is determined. For detailed description, see: Eino: The design concept of orchestration
The yellow part in the figure represents another type conversion mechanism provided by eino, that is: if downstream receives a type map[string]any
, but the upstream output type is not map[string]any, you can use graph.AddXXXNode(node_key, xxx, compose.WithOutputKey("outkey")
to convert the upstream output type to map[string]any, where the key of the map is the OutputKey specified in the option. This mechanism is generally convenient to use when multiple edges converge to a single node.
Branch
If a node is connected to multiple edges, the downstream nodes of each edge will run once. Branch is another mechanism: a branch followed by n nodes will only run the node corresponding to the key returned by the condition. The nodes following the same branch must also be type aligned.
As shown in the figure below:
This is a scenario simulating the running logic of a react agent
You can see that a branch itself has a condition
, the input of this function must be type aligned with the upstream. At the same time, the nodes following the branch must also be able to receive the upstream’s output just like the condition.
Type Alignment in Chains
Chain
From an abstract perspective, a chain is a sequence
, as shown below:
The types of logical nodes can be divided into three categories:
- Orchestrable components (e.g., chat model, chat template, retriever, lambda, graph, etc.)
- Branch nodes
- Parallel nodes
As can be seen, from the perspective of a chain, whether it is a simple node (e.g., chat model) or a complex node (e.g., graph, branch, parallel), they are treated the same. During execution, each step corresponds to the execution of a node.
Therefore, the types of upstream and downstream nodes in a chain must be aligned, as shown below:
func TestChain() {
chain := compose.NewChain[map[string]interface,string]()
nodeTemplate := &fakeChatTemplate{} // input: map[string]any, output: []*schema.Message
nodeHistoryLambda := &fakeLambda{} // input: []*schema.Message, output: []*schema.Message
nodeChatModel := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
nodeConvertResLambda := &fakeLambda{} // input: *schema.Message, output: string
chain.
AppendChatTemplate(nodeTemplate).
AppendLambda(nodeHistoryLambda).
AppendChatModel(nodeChatModel).
AppendLambda(nodeConvertResLambda)
}
The logic above can be represented by the following diagram:
If the types of upstream and downstream nodes are not aligned, the chain will return an error during chain.Compile(). In contrast, the graph will report an error when calling graph.AddXXXNode().
parallel
Parallel is a special type of node within a chain. From the perspective of the chain, a parallel node is no different from other nodes. Internally, the basic topology of a parallel node is as follows:
One structure formed by multiple edges in the graph is illustrated here. The basic assumption is that there is exactly one node on each edge of a parallel node. Of course, that one node can also be a graph. However, note that the current framework does not directly provide the capability to nest branch or parallel within a parallel node.
Each node within a parallel node has the same upstream node, and thus they must align with the output type of the upstream node. For example, if the upstream node outputs *schema.Message
as shown in the diagram, then each node must be able to receive this type. The receiving method is consistent with that in the graph, typically using same type
, interface definition
, any
, or input key option
.
The output of a parallel node is always a map[string]any
, where the key is specified as output_key when calling parallel.AddXXX(output_key, xxx, opts...)
, and the value is the actual output of the node.
An example of constructing a parallel node is as follows:
func TestParallel() {
chain := compose.NewChain[map[string]any, map[string]*schema.Message]()
parallel := compose.NewParallel()
model01 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model02 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model03 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
parallel.
AddChatModel("outkey_01", model01).
AddChatModel("outkey_02", model02).
AddChatModel("outkey_03", model03)
lambdaNode := &fakeLambdaNode{} // input: map[string]any, output: map[string]*schema.Message
chain.
AppendParallel(parallel).
AppendLambda(lambdaNode)
}
A parallel node from the perspective of a chain looks like this:
The diagram simulates the same question being answered by different LLMs. The results can be used for comparison.
It is important to note that this structure is only a logical perspective. Since the chain itself is also implemented using a graph, the parallel node will be spread out in the underlying graph.
branch
The branch in a chain is similar to the branch in a graph; all nodes in the branch must be aligned with the type of upstream nodes. This will not be elaborated further here. The special nature of a chain branch is that all possible branch nodes in the branch will either connect to the same node in the chain or all connect to END.
Type Alignment in Workflow
The dimensions of type alignment in Workflow have been changed from the overall Input & Output to the field level. Specifically, it can be divided into:
- The overall output of the upstream is type-aligned to a specific field downstream.
- A specific field of upstream output is type-aligned to the overall downstream.
- A specific field of upstream output is type-aligned to a specific field of downstream input.
The principles and rules are the same as for overall type alignment.
Alignment of Types under invoke and stream
In Eino, the result of orchestration is either a graph or a chain. To execute it, you need to use Compile()
to generate a Runnable
interface.
One important function of Runnable is to provide four calling methods: “Invoke”, “Stream”, “Collect”, and “Transform”.
You can check the introduction of the above calling methods and detailed runnable introduction in: Eino: Overview
Suppose we have a Graph[[]*schema.Message, []*schema.Message]
, which contains a ChatModel node and a Lambda node. After compiling, it becomes a Runnable[[]*schema.Message, []*schema.Message]
.
package main
import (
"context"
"io"
"testing"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/stretchr/testify/assert"
)
func TestTypeMatch(t *testing.T) {
ctx := context.Background()
g1 := compose.NewGraph[[]*schema.Message, string]()
_ = g1.AddChatModelNode("model", &mockChatModel{})
_ = g1.AddLambdaNode("lambda", compose.InvokableLambda(func(_ context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
}))
_ = g1.AddEdge(compose.START, "model")
_ = g1.AddEdge("model", "lambda")
_ = g1.AddEdge("lambda", compose.END)
runner, err := g1.Compile(ctx)
assert.NoError(t, err)
c, err := runner.Invoke(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)
assert.Equal(t, "the weather is good", c)
s, err := runner.Stream(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)
var fullStr string
for {
chunk, err := s.Recv()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
fullStr += chunk
}
assert.Equal(t, c, fullStr)
}
When we call the compiled Runnable in Stream mode, the model node outputs *schema.StreamReader[*Message]
, but the lambda node is an InvokableLambda that only accepts non-streaming *schema.Message
as input. This complies with the type alignment rule because the Eino framework automatically concatenates streaming Messages into a complete Message.
In stream mode, concatenating frames is a very common operation. During concatenation, all elements from *StreamReader[T]
are first extracted and converted into []T
, and then an attempt is made to concatenate []T
into a complete T
. The framework has built-in support for concatenating the following types:
*schema.Message
: Seeschema.ConcatMessages()
for detailsstring
: The implementation logic is equivalent to+=
[]*schema.Message
: Seecompose.concatMessageArray()
for detailsMap
: Merge values with the same key; the merging logic is the same as above. If there is an un-combinable type, it fails (ps: it is not overwritten)Struct
orStruct pointer
: First converted into[]map[string]any
, then merged according to the map logic. The struct must not contain unexported fields.- Other slices: Can only be merged if there is exactly one non-zero value element in the slice.
For other scenarios, or when users want to override the default behavior with custom logic, developers can implement their own concat method and register it to the global concatenation function using compose.RegisterStreamChunkConcatFunc()
.
Here is an example:
// Suppose our own structure is as follows
type tStreamConcatItemForTest struct {
s string
}
// Implement a splicing method
func concatTStreamForTest(items []*tStreamConcatItemForTest) (*tStreamConcatItemForTest, error) {
var s string
for _, item := range items {
s += item.s
}
return &tStreamConcatItemForTest{s: s}, nil
}
func Init() {
// Register in the global splicing method
compose.RegisterStreamChunkConcatFunc(concatTStreamForTest)
}
Runtime Type Alignment Check Scenarios
Eino’s Graph type alignment check will verify if the types of the two nodes match during err = graph.AddEdge("node1", "node2")
. This allows for type mismatch errors to be identified either during the graph construction process
or the Compile process
, adhering to rules ①②③ as listed in Eino: The design concept of orchestration.
When the upstream node’s output is an interface
, and the downstream node type implements that interface
, it is likely that upstream can be converted to downstream type (type assertion). However, whether the conversion succeeds can only be determined during the runtime process
, so type checks in this scenario are deferred to runtime.
The structure is shown in the diagram below:
This scenario is suitable for developers who can handle upstream and downstream type alignment on their own and choose the appropriate downstream execution nodes based on different types.# User Manual for Eino
Opinionated Design Choices
Principle of Read-Only External Variables
When data in Eino’s Graph flows among Nodes, Branches, and Handlers, it is always variable assignment, not copying. When the Input is of a reference type, such as a Struct pointer, map, or slice, modifying the Input inside Nodes, Branches, or Handlers will have side effects on the outside and may lead to concurrency issues. Therefore, Eino follows the principle of read-only external variables: do not modify the Input inside Nodes, Branches, or Handlers. If modification is required, make a copy first.
This principle also applies to Chunks in the StreamReader.
Fan-In and Merging
Fan-in: Outputs from multiple predecessor nodes converge to a successor node and together serve as the input for the successor node. It is necessary to clearly define how the outputs of multiple predecessor nodes are merged. Eino’s choice is as follows. First, it requires that the actual types of the outputs of multiple predecessor nodes must be the same Map type, and the keys among them must not be repeated. Second:
- In non-streaming scenarios, after merging, it becomes a single map that contains all key-value pairs from all upstream sources.
- In streaming scenarios, multiple upstream StreamReaders of the same type are merged into one StreamReader. When actually receiving data from the merged StreamReader, the effect is to read fairly from multiple upstream StreamReaders.
Workflow can map the output fields of multiple predecessor nodes to different input fields of the successor node. Eino converts the Struct output from each predecessor to a Map before any merge process, still conforming to the above rules.
Streaming Processing
Eino believes that components should only need to implement genuine streaming paradigms from the business scenario. For example, a ChatModel does not need to implement Collect. Therefore, in orchestration scenarios, Eino automatically completes the missing streaming paradigms for all nodes.
When running a Graph via Invoke, all internal nodes operate in Invoke mode. When running a Graph via Stream, Collect, or Transform, all internal nodes operate in Transform mode.
Auto Concatenate: In scenarios where Stream chunks are concatenated into complete content, the user-defined concatenation function registered by the user is preferred. Otherwise, default behaviors provided by the framework are performed, including Message, Message arrays, String, Map, and Struct/Struct pointers.
Auto Boxing: In scenarios requiring the conversion of non-stream type T to StreamReader[T], the framework automates the process.
Auto Merge: See the above section on “Fan-In and Merging.”
Auto Copy: In scenarios requiring the replication of streams, the framework automatically handles stream copying, including instances where a single stream fans out to multiple downstream nodes, or a single stream enters one or more callback handlers.
Finally, Eino requires all orchestration elements to be aware of and capable of handling streams. This includes branches, state handlers, callback handlers, passthroughs, lambdas, etc.
For more details on Eino’s streaming capabilities, refer to Eino Points of Streaming Orchestration.
Global State
State: In NewGraph, pass in the creation method of State through compose.WithGenLocalState
. This globally scoped state at the request level can be read from and written to during various stages of a single request.
Eino recommends using StatePreHandler
and StatePostHandler
:
- StatePreHandler: Before the execution of each node, read from and write to State. Replace the node’s input as needed.
- StatePostHandler: After the execution of each node, read from and write to State. Replace the node’s output as needed.
For streaming scenarios, use the corresponding StreamStatePreHandler
and StreamStatePostHandler
.
These state handlers are located outside the nodes and affect the nodes by modifying the Input or Output, thus ensuring the “stateless” property of the nodes.
If you need to read from and write to State inside the nodes, Eino provides the ProcessState[S any](ctx context.Context, handler func(context.Context, S) error) error
function.
The Eino framework will add locks at all positions where the State is read from or written to.
Callback Injection
Eino’s orchestration framework considers that components entering the orchestration might have internally embedded Callback aspects or might not. This information is determined by whether the component implements the Checker
interface and the return value of the IsCallbacksEnabled
method in the interface.
- When
IsCallbacksEnabled
returns true, Eino’s orchestration framework uses the component’s internally implemented Callback aspects. - Otherwise, it automatically wraps the component implementation with external Callback aspects, (only) reporting input and output.
In either case, RunInfo will be automatically inferred.
Additionally, for the Graph as a whole, Callback aspects will always be injected, with RunInfo being the Graph itself.
For a complete explanation of Eino’s Callback capabilities, see Eino: Callback Manual.
Option Allocation
Eino supports various dimensions of Call Option allocation methods:
- Default global allocation, i.e., allocated to all nodes, including nested internal graphs.
- An option can be added to a specific component type, in which case it is by default allocated to all nodes of that type, such as AddChatModelOption. Lambdas that have defined unique Option types can also specify the Option for themselves in this way.
- Any specific nodes can be designated using
DesignateNode(key ...string)
. - Any depth of nested graphs, or any specific nodes within them, can be designated using
DesignateNodeWithPath(path ...*NodePath)
.
For a complete explanation of Eino’s Call Option capabilities, see Eino: CallOption capabilities and specification.
Graph Nesting
The outcome of graph orchestration, Runnable
, is very similar in interface form to Lambda. Therefore, a compiled graph can be simply encapsulated as a Lambda and nested into other graphs as a Lambda node.
Another way is that, before compilation, Graph, Chain, Workflow, etc., can be nested directly into other graphs using AddGraph. The differences between the two methods are:
- The Lambda method adds an additional Lambda node in the trace. From the perspective of other Callback handlers, there will also be an additional layer.
- The Lambda method requires using the Lambda’s Option to inherit CallOption and cannot use DesignateNodeWithPath.
- The Lambda method requires the internal graph to be precompiled. Directly using AddGraph allows the internal graph to be compiled together with the parent graph.
Internal Mechanism
Execution Sequence
Taking an InvokableLambda (input as string, output as int) with StatePreHandler, StatePostHandler, InputKey, OutputKey added, and no Callback aspect implemented internally as an example, the complete flow execution sequence in the diagram is as follows:
In the workflow scenario, field mapping occurs in two places:
- After the node execution’s StatePostHandler and the “stream replication” step, each downstream-required field will be separately extracted.
- After the “merge” step before node execution, and before StatePreHandler, the extracted upstream field values will be converted to the current node’s input.
Runtime Engine
When NodeTriggerMode == AnyPredecessor
, the graph executes using the pregel engine, corresponding to a directed graph with cycles. Characteristics include:
- One or more currently executing nodes’ all subsequent nodes collectively execute as a SuperStep. At this time, these new nodes become the “current” nodes.
- Supports Branch, supports loops in the graph, but it may require manually adding passthrough nodes to ensure SuperStep nodes meet expectations, as illustrated below:
In the above image, Node 4 and Node 5 are executed together as per rules, which likely does not meet expectations. It needs to be changed to:
When NodeTriggerMode == AllPredecessor
, the graph executes using the dag engine, corresponding to a directed acyclic graph. Characteristics include:
- Each node has a specific predecessor node, and this node is only executable once all predecessor nodes are complete.
- An eager mode can be selected, where there is no SuperStep concept. Each node, upon completion, immediately checks which subsequent nodes can be run and executes them at the earliest time.
- Does not support Branch, does not support cycles in the graph, as it breaks the “each node has a specific predecessor node” assumption.
- No need for manual SuperStep alignment.
In summary, the pregel mode is flexible and powerful but comes with additional mental overhead, while the dag mode is clear and simple but limited in application scenarios. In the Eino framework, Chain uses the pregel mode, Workflow uses the dag mode, and Graph supports both; users can choose between pregel and dag.