Workflow design:修订间差异

来自WHY42
无编辑摘要
 
(未显示同一用户的42个中间版本)
第1行: 第1行:
随着2023年大模型在国内的发展成熟,现在大家逐步开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。的确,看似无所不能的大模型,也并不是直接丢到一个系统就能用的,如何开发大模型应用,实际上还处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能给出一个"best practice";对于大模型应用开发来说,却仍还在探索阶段。
随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。
目前,一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个level的:
 
目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的:


[[Image: LLM-app-levels.png|600px]]
[[Image: LLM-app-levels.png|600px]]
第193行: 第194行:
对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!
对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!


== 对工作流的抽象 ==
== 工作流的构建==
=== 对工作流的抽象定义 ===
首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:
首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:
<syntaxhighlight lang="go">
<syntaxhighlight lang="go">
第220行: 第222行:
Connect(fromId string, toId string) error
Connect(fromId string, toId string) error
ConnectFromPort(fromId string, toId string, toPort string) error
ConnectFromPort(fromId string, toId string, toPort string) error
Break(fromId string, toId string) error
HasConnection(fromId string, toId string) (bool, error)
GetConnection(fromId string, toId string) (Connection, error)
GetConnections() ([]Connection, error)
DeleteVertex(id string) error
GetVertexes() []Vertex
GetStartVertexes() []Vertex
GetEndVertexes() []Vertex
TopologicalSorting() ([]Vertex, error)
TopologicalSorting() ([]Vertex, error)
TransitiveReduction() (PortawareDAG, error)
TransitiveReduction() (PortawareDAG, error)
GetDependencies(id string) ([]string, error)
    // ...
GetDependencyMap() (map[string][]string, error)
GetChildren(id string) ([]string, error)
GetChildrenMap() (map[string][]string, error)
}
}
</syntaxhighlight>
</syntaxhighlight>
第242行: 第233行:
type Node interface {
type Node interface {
GetId() string
GetId() string
GetName() string
GetType() NodeType
GetType() NodeType
GetNamedPorts() []Port
GetConnectionType() ConnectionType
GetOutputVariables() []VariableDefination
GetTriggerRule() TriggerRule
Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
    // ...
}
}
</syntaxhighlight>
</syntaxhighlight>
第265行: 第252行:
</syntaxhighlight>
</syntaxhighlight>


而最终工作流的执行,是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。
=== 解析动态类型 ===
 
由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。
比如,我们JSON中大概是这样定义的:
<syntaxhighlight lang="bash">
{
    "nodes": [
        { "type": "start", ...},
        { "type": "llm", ...}
    ]
}
</syntaxhighlight>
 
而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式,
不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:
 
<syntaxhighlight lang="go">
for index, node := range w.Nodes {
jsonData, _ := json.Marshal(node.Properties)
var p any
var err error
t := node.Type
if t == TypeStart {
p, err = loadStartNode(jsonData)
} else if t == TypeLlm {
p, err = loadLlmNode(jsonData)
} else if t == TypeEnd {
p, err = loadHttpNode(jsonData)
}
// ...
</syntaxhighlight>
 
== 异步工作流运行模型 ==
=== Pub/Sub模式 ===
在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。
Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。
在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。
 
这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:
 
<syntaxhighlight lang="bash">
type Subscriber[T any] chan T
type TopicFilter func(event any) bool
 
type Publisher[T any] struct {
name        string
mutex      sync.RWMutex
buffer      int
closed      bool
subscribers map[Subscriber[T]]TopicFilter
}
</syntaxhighlight>
 
实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:
 
<syntaxhighlight lang="bash">
func (p *Publisher[T]) Publish(event T) {
p.mutex.RLock()
defer p.mutex.RUnlock()
 
var wg sync.WaitGroup
for subscriber, filter := range p.subscribers {
subscriber := subscriber
if filter == nil || filter(event) {
wg.Add(1)
go func(event T, channel chan<- T) {
defer wg.Done()
 
channel <- event
}(event, subscriber)
}
}
wg.Wait()
}
</syntaxhighlight>
 
这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。


<syntaxhighlight lang="bash">
<syntaxhighlight lang="bash">
第275行: 第338行:
</syntaxhighlight>
</syntaxhighlight>


== 使用Go routine 实现异步工作流运行模型 ==
=== 使用Go routine实现异步运行 ===
前面我们有提到,尽管拓扑排序
前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点,
那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。
 
进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。
可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。
其运行原理如下面的伪代码所示:
 
<syntaxhighlight lang="python">
queue <- dag.typological_sort()
while queue.is_not_empty():
    ready_nodes <- queue.pop_zero_degree_nodes()
        for node in ready_nodes start:
            node.execute()
</syntaxhighlight>
 
再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。
最终,我们执行工作流的方法逻辑如下:
 
<syntaxhighlight lang="go">
func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
defer e.Close()
e.eventPublisher.Publish(NewWorkflowStartedEvent())
errGroup, ctx := errgroup.WithContext(c.GetContext())
var doneGroup sync.WaitGroup
for _, node := range e.pendingNodes {
node := node
waitChannel := e.notifyChannels[node.Id]
errGroup.Go(func() error {
select {
case <-ctx.Done():
return errCancelledByContext
case inputs := <-waitChannel:
nodeInstance, ok := node.Payload.(Node)
if !ok {
return errFailedToGetNodeInstance
}
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
})
}
 
e.popReadyNodes()
 
err := errGroup.Wait()
if err != nil {
return err
}
doneGroup.Wait()
e.eventPublisher.Publish(NewWorkflowFinishedEvent())
return nil
}
</syntaxhighlight>
 
有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine,
但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类,
这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。
 
<syntaxhighlight lang="go">
select {
case <-ctx.Done():            // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
// ...
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
</syntaxhighlight>
 
注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行,
这个方法的主要作用是找到当前节点中,所有可以运行的节点。
例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束,
那么B就可以开始运行。
这时候,A的运行结果将作为B的输入信息。
 
<syntaxhighlight lang="bash">
func (e *AsyncExecutor) popReadyNodes() error {
defer e.lock.Unlock()
e.lock.Lock()
 
for _, node := range e.pendingNodes {
inputs, ready, err := e.getInputs(node.Id)
if err != nil {
return err
}
if !ready {
return nil
}
picked := e.pendingNodes[0]
e.pendingNodes = e.pendingNodes[1:]          // 从尚未执行的节点队列中移除队首
notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
notifyChannel <- inputs
}
return nil
}
</syntaxhighlight>
 
这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。
整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。
细心的读者容易想到一个问题,就是当工作流开始执行的时候,
总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。
 
执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。
其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。
 
这是最开始的逻辑:
<syntaxhighlight lang="python">
// 等待所有依赖节点发送通知,
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
results := []NodeResult{}
for _, ch := range waitChannels {
result := <-ch
results = append(results, result)
}
return results
}
</syntaxhighlight>
 
仔细思考其实这样实现问题挺多的:
 
* 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
* 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞
 
==条件分支控制 ==
在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效,
但是,如果考虑到条件节点这一种特殊的节点,
就会存在问题。
 
[[Image:Cond-node.png|600px]]
 
如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个,
我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行),
对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。
 
而在具体的实现上,我们为节点定义了一些属性:
 
* TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等
* ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况)
* OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中
 
通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 <ref>https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html</ref>,但尽力避免不必要的复杂性。
 
==引用和表达式解析==
在工作流中,我们需要在一个节点中使用其他节点的结果。
比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑:
 
* 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等
* 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的
* 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用


== Pub/Sub 模式 ==
在Workflow的定义中,我们通过一个简单的表达式来指代引用:
 
<syntaxhighlight lang="json">
"expressions": [
  {
    "variable": "$start1.input", // 获取开始节点(start1)中的input输出变量
    "operator": "contains",
    "target": {
      "type": "fixed",
      "value": "kingsoft"
    }
  }
]
</syntaxhighlight>
 
这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。
但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构,
需要进行递归寻址:
 
<syntaxhighlight lang="go">
func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) {
if input.NodeId == nodeId {
return input, nil
} else {
for _, parent := range input.Inputs {
result, err := resolveNodeResultRecursively(parent, nodeId)
if err == nil {
return result, nil
}
}
return NodeResult{}, errVariableNodeNotAccessable
}
}
</syntaxhighlight>
 
获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换:
 
<syntaxhighlight lang="go">
func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) {
if expectedType == nil {
return v, nil
}
if v == nil {
return nil, errors.New("value is nil")
}
switch *expectedType {
case StringType:
return castToString(v)
case IntegerType:
return castToInt32(v)
case FloatType:
return castToFloat32(v)
case ArrayIntegerType:
return castToInt32Array(v)
case ArrayFloatType:
return castToFloat32Array(v)
    // ...
</syntaxhighlight>
 
这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写:
 
<syntaxhighlight lang="go">
func castToString(v any) (string, error) {
if v == nil {
return "", nil
}
switch value := v.(type) {
case string:
return value, nil
case int:
return strconv.FormatInt(int64(value), 10), nil
case int32:
return strconv.FormatInt(int64(value), 10), nil
case int64:
return strconv.FormatInt(value, 10), nil
case float32:
return strconv.FormatFloat(float64(value), 'f', -1, 32), nil
case float64:
return strconv.FormatFloat(value, 'f', -1, 64), nil
case bool:
return strconv.FormatBool(value), nil
default:
return "", errVariableTypeIncompatible
}
}
</syntaxhighlight>
 
== 测试工作流 ==
工作流实际是一个非常复杂的系统,
很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。
那么怎样来保证代码质量呢?
 
=== 使用TDD ===
曾经在知乎上看到一个很有意思的问题和回答:
 
“如何激怒一个Golang开发者?”
“你写的代码像Java……”
 
其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言,
但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。
 
在工作流的实现中,我们充分运用这一思想,
通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。
举个例子,
大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求:
 
 
<syntaxhighlight lang="go">
type LlmNode struct {
Id            string
Name          string
client        GatewayClient
Properties    LlmNodeProperties
requestBuilder LlmRequestBuilder
}
</syntaxhighlight>
 
而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型:
 
[[Image:coze-json.png|600px]]
 
而大模型并不都具有这样的魔法,具体的实现方式是这样的:
 
* 对于OpenAI,它支持json_mode,可以设置该参数并直接在提示词中描述json的结构<ref>https://platform.openai.com/docs/guides/json-mode</ref>
* 对于Minimax,它支持设定返回格式,需要构建一个特殊的参数来控制json结构<ref>https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8</ref>
 
因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异:
 
<syntaxhighlight lang="go">
type LlmRequestBuilderImpl struct {
schemaGenerator JsonSchemaGenerator
minimaxAdaptor  MinimaxAdaptor
gptAdaptor      GptAdaptor
}
</syntaxhighlight>
 
对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中,
对于GptAdaptor我们有这些测试用例:
 
<syntaxhighlight lang="go">
// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any)
x := ext["response_format"].(map[string]string)
assert.Equal(t, x["type"], "json_object")
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
 
// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
 
// 如果输出不是JSON格式,那么什么也不做
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, `you're a helpful assistant.`)
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
</syntaxhighlight>
 
而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可:
 
<syntaxhighlight lang="go">
func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) {
client := NewMockGatewayClient(t)
builder := NewMockLlmRequestBuilder(t)
builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil)
node := LlmNode{
Id:    "llm1",
client: client,
Properties: LlmNodeProperties{
Context: "you're helpful assistant",
Prompt:  "give me a name",
Stream:  false,
},
requestBuilder: builder,
}
    // ...
</syntaxhighlight>
 
使用mockery<ref>https://github.com/vektra/mockery</ref>可以为指定的interface自动生成mock对象。
注意不是字节开源的那个Mockey<ref>https://github.com/bytedance/mockey/blob/main/README_cn.md</ref>,这个框架实际上存在一些坑:
 
* 需要添加一些参数来禁用内联和编译优化
* 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果
 
=== 自动化集成测试 ===
对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构,
我们不需要关心工作流执行的Side effect,只需要关心事件即可。
也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。
 
例如,我们有一个用例测试条件节点:
 
<syntaxhighlight lang="go">
func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`))
}))
 
data, _ := os.ReadFile("./schema/cases/llm-condition.json")
json, err := workflow.Parse(data)
assert.NilError(t, err)
builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{
BaseUrl: server.URL,
})
dag, errs := builder.ValidateAndToDAG()
assert.Equal(t, len(errs), 0)
executor, err := workflow.NewExecutor(*dag)
assert.NilError(t, err)
channel := executor.Subscribe()
go func() {
w := workflow.NewEmptyWorkflowContext(context.TODO())
w.SetParameters(map[string]any{
"input": "what's the main product of kingsoft?",
})
err := executor.ExecSync(w)
assert.NilError(t, err)
}()
events := []workflow.WorkflowEvent{}
for e := range channel {
events = append(events, e)
}
 
assert.Equal(t, len(events), 12)
 
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2"
})
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes"
})
}
</syntaxhighlight>
 
在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。
在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。
 
当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。
这也意味着集成测试的数目理论上应该远小于单元测试,
或者说单元测试应该更充分。
 
=== CLI测试工具 ===
除了单元测试、集成测试等方式来测试工作流的执行之外,
很多时候,我们需要定位分析问题,
是很麻烦的。
 
比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A;
或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。
 
为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果:
 
[[Image:workflow-cli.gif|600px]]


= 未来展望 =
= 未来展望 =


关于为什么要做Workflow,感兴趣可以从这里了解一些其他的信息<ref>AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF</ref>
工作流模型是一个高度可扩展的模型,
可以通过添加新的节点类型、插件等,
实现更复杂的功能。
 
一些在计划内的功能有:
 
* RAG:知识库检索
* 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑
* 图像生成:类似Coze的图像流,包含文生图以及图像处理等
* 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件
* 嵌套的Workflow:允许在Workflow中嵌套调用
 
尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层,
目前来看,
大概率Agent才是AI应用的尽头。
 
在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具,
我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。
 
尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么?
在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。
这个问题留给聪明的读者去思考:p
 
关于我们为什么要做Workflow,感兴趣可以从这里<ref>AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF</ref>了解更多的背景信息。

2024年7月29日 (一) 07:11的最新版本

随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。

目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的:

最简单的Wrapper层级,实际就是把大模型的提示词封装成一个固定功能的”方法“,然后集成到业务系统中。KPP的Function(或者Prompt),就是一个Wrapper。Wrapper能做什么完全取决于大模型的能力,比如生成一段文字,甚至写一段代码,都没有问题。但是如果想做一个复杂的功能,恐怕就不行了,比如,先生成代码再执行得到代码运行结果,这样就没法直接通过大模型请求来做了,原因是大模型本身无法直接执行代码 [1]

显然,对于复杂的功能,我们可以尝试分步骤来解决。比如一键生成PPT就是很好的例子,第一步先生成一个outline;然后再根据outline,去填充内容;最后通过一个生成文档的API生成一个美化后的PPT。这样每一步都可以用最适合的技术来进行,比如生成outline和填充内容用LLM,可以选择最合适的模型;美化PPT则可以直接使用已有的API来处理。

这其实就是Flow了,或者更明确一点,Agentic Workflow[2]。不是普通的工作流,是“智能”的工作流。通过流程的编排,我们可以扩充LLM的能力,通过一些组合来实现复杂的业务功能。相对于Wrapper而言,Workflow可以更好地实现复杂的AI能力。 因此,我们在KPP中也实现了Workflow的能力。这里就技术层面分享一下KPP在构建Workflow时的一些设计和实现。

为什么要自研,搞个开源的不行么?

实际上,在开始设计工作流之前,就已经有不少公司已经在做了。其中最具有参考价值的主要是Dify和扣子。其中,Dify是一个开源的系统,理论上可以直接拿过来用。 但是,考虑到WPS AI的实际情况,有几个比较重要的问题是Dify难以支撑的:

  • 无法直接嵌入到WPS AI的现有体系,包括网关、KPP、AI Server等,一定会存在一个二次开发的成本
  • Dify是用Python写的,如果大规模使用,很可能会遇到性能问题,包括工作流本身的异步多线程执行、以及水平扩展的性价比

实际上,字节的Coze是一个非常适合我们的开发平台。它的产品设计相当专业和完善,Workflow、图像流、Agent、多Agent协作等,是目前所有平台中定义最清晰的。相对而言,一些平台自己都不知道自己在做什么,比如百度的Agent Builder和App Builder,做着做着就成一样的了[3]。 Dify在这方面也有明显的问题,看起来还没有搞清楚Agent和Workflow的关系。

然而很遗憾目前为止没有一个开源的系统可以达到Coze的高度。 正是由于种种原因,最终KPP决定完全自研Workflow系统。

理论基础

即便从零开始实现一个Workflow其实也比较简单,因为从原理上来说,Workflow无非就是一个DAG(有向无环图)而已。 它是工作流的核心。DAG相信大家都比较熟悉,之所以要用到DAG正式因为它有两个重要的特点:

  • 节点之间的连接是有向的,否则我们将无法判断节点之间的依赖关系和执行顺序
  • 整个图是无环的,这也限制了在Workflow中是不可以存在循环的。这点非常重要,如果是一个Agent,那么我们可以一直给Agent发消息聊下去,这是一个循环;但是Workflow一定不能存在这种不确定的循环,或者说,我们期望在Workflow能够在确定有限的时间内运行完成并获取结果,它是Stateless(无状态)的

使用拓扑排序确定执行顺序

执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束?

考虑如下的几种场景:

从图中我们可以看出这也一些信息:

  • 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG
  • 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行
  • 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行

我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。

拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。

拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。

值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。

事件驱动架构

确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了:

sorted_nodes <- dag.topological_sort()
for node in sorted_nodes:
    node.wait_for_dependencies()
    node.execute()

但是我们忽略了几个重要的问题:

  • 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见
  • 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。

使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 这样对于工作流执行引擎将有非常强的扩展性和可测试性。

传递闭包简化

对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。

使用Go语言实现工作流

在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:

  • 开始、结束
  • 条件判断
  • 大模型调用
  • 插件:包括bing搜索、高召回、分流

概念模型

尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, 但实际工作流产品的设计上,一条边还可能附带一些其他的信息。

例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的[4]

当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:

注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 经过一些权衡之后,我们提出了如下的一个工作流概念模型:

这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, 因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。

这里,对于这个“点”叫什么名字好其实有一个小故事。 最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。

若合一契,岂不快哉!

定义工作流Schema

在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?

这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:

  • DAG的表示,即节点和边
  • 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同

我们使用JSON Schema定义了一个工作流格式,大概长这样:

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "nodes": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/node"
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/link"
      }
    },
    ...
  }
}

对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:

{
  ...
  "bingSearchPluginProperties": {
    "properties": {
      "pluginType": {
        "type": "string"
      },
      "version": {
        "type": "string"
      },
      "query": {
        "$ref": "#/definitions/variableBinding"
      },
      "count": {
        "type": "number"
      }
    },
    "required": [
      "pluginType",
      "version",
      "query"
    ]
  }
}

完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:

  • 这个工作流是否合法?至少格式上是否正确
  • 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?

对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!

工作流的构建

对工作流的抽象定义

首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:

type Port struct {
	Direction PortDirection `json:"type"`
	Id        string        `json:"id"`
}

type Vertex struct {
	Id      string
	Type    string
	Ports   []Port
	Payload any
}

type Connection struct {
	From       string
	To         string
	SourcePort *Port
	TargetPort *Port
}

type PortawareDAG interface {
	AddVertex(node Vertex) error
	GetVertex(id string) (Vertex, error)
	Connect(fromId string, toId string) error
	ConnectFromPort(fromId string, toId string, toPort string) error
	TopologicalSorting() ([]Vertex, error)
	TransitiveReduction() (PortawareDAG, error)
    // ...
}

而对于具体的不同的节点,是通过另外的Node接口来定义:

type Node interface {
	GetId() string
	GetType() NodeType
	Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
    // ...
}

其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, 比如大模型节点的实现有单独的定义:

type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

解析动态类型

由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。 比如,我们JSON中大概是这样定义的:

{
    "nodes": [ 
        { "type": "start", ...},
        { "type": "llm", ...}
    ]
}

而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式, 不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:

for index, node := range w.Nodes {
	jsonData, _ := json.Marshal(node.Properties)
	var p any
	var err error
	t := node.Type
	if t == TypeStart {
		p, err = loadStartNode(jsonData)
	} else if t == TypeLlm {
		p, err = loadLlmNode(jsonData)
	} else if t == TypeEnd {
		p, err = loadHttpNode(jsonData)
	}
	// ...

异步工作流运行模型

Pub/Sub模式

在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。 Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。 在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。

这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:

type Subscriber[T any] chan T
type TopicFilter func(event any) bool

type Publisher[T any] struct {
	name        string
	mutex       sync.RWMutex
	buffer      int
	closed      bool
	subscribers map[Subscriber[T]]TopicFilter
}

实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:

func (p *Publisher[T]) Publish(event T) {
	p.mutex.RLock()
	defer p.mutex.RUnlock()

	var wg sync.WaitGroup
	for subscriber, filter := range p.subscribers {
		subscriber := subscriber
		if filter == nil || filter(event) {
			wg.Add(1)
			go func(event T, channel chan<- T) {
				defer wg.Done()

				channel <- event
			}(event, subscriber)
		}
	}
	wg.Wait()
}

这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。

type Executor interface {
	Subscribe() Subscriber[WorkflowEvent]
	Close()
	ExecSync(ctx WorkflowContext) error
}

使用Go routine实现异步运行

前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点, 那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。

进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。 可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。 其运行原理如下面的伪代码所示:

queue <- dag.typological_sort()
while queue.is_not_empty():
    ready_nodes <- queue.pop_zero_degree_nodes()
        for node in ready_nodes start:
            node.execute()

再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。 最终,我们执行工作流的方法逻辑如下:

func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
	defer e.Close()
	e.eventPublisher.Publish(NewWorkflowStartedEvent())
	errGroup, ctx := errgroup.WithContext(c.GetContext())
	var doneGroup sync.WaitGroup
	for _, node := range e.pendingNodes {
		node := node
		waitChannel := e.notifyChannels[node.Id]
		errGroup.Go(func() error {
			select {
			case <-ctx.Done():
				return errCancelledByContext
			case inputs := <-waitChannel:
				nodeInstance, ok := node.Payload.(Node)
				if !ok {
					return errFailedToGetNodeInstance
				}
				return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
			}
		})
	}

	e.popReadyNodes()

	err := errGroup.Wait()
	if err != nil {
		return err
	}
	doneGroup.Wait()
	e.eventPublisher.Publish(NewWorkflowFinishedEvent())
	return nil
}

有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine, 但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类, 这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。

select {
case <-ctx.Done():            // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
	return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
	// ... 
	return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}

注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行, 这个方法的主要作用是找到当前节点中,所有可以运行的节点。 例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束, 那么B就可以开始运行。 这时候,A的运行结果将作为B的输入信息。

func (e *AsyncExecutor) popReadyNodes() error {
	defer e.lock.Unlock()
	e.lock.Lock()

	for _, node := range e.pendingNodes {
		inputs, ready, err := e.getInputs(node.Id)
		if err != nil {
			return err
		}
		if !ready {
			return nil
		}
		picked := e.pendingNodes[0]
		e.pendingNodes = e.pendingNodes[1:]          // 从尚未执行的节点队列中移除队首
		notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
		notifyChannel <- inputs
	}
	return nil
}

这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。 整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。 细心的读者容易想到一个问题,就是当工作流开始执行的时候, 总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。

执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。 其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。

这是最开始的逻辑:

// 等待所有依赖节点发送通知
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
	results := []NodeResult{}
	for _, ch := range waitChannels {
		result := <-ch
		results = append(results, result)
	}
	return results
}

仔细思考其实这样实现问题挺多的:

  • 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
  • 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞

条件分支控制

在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效, 但是,如果考虑到条件节点这一种特殊的节点, 就会存在问题。

如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个, 我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行), 对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。

而在具体的实现上,我们为节点定义了一些属性:

  • TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等
  • ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况)
  • OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中

通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 [5],但尽力避免不必要的复杂性。

引用和表达式解析

在工作流中,我们需要在一个节点中使用其他节点的结果。 比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑:

  • 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等
  • 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的
  • 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用

在Workflow的定义中,我们通过一个简单的表达式来指代引用:

"expressions": [
  {
    "variable": "$start1.input", // 获取开始节点(start1)中的input输出变量
    "operator": "contains",
    "target": {
      "type": "fixed",
      "value": "kingsoft"
    }
  }
]

这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。 但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构, 需要进行递归寻址:

func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) {
	if input.NodeId == nodeId {
		return input, nil
	} else {
		for _, parent := range input.Inputs {
			result, err := resolveNodeResultRecursively(parent, nodeId)
			if err == nil {
				return result, nil
			}
		}
		return NodeResult{}, errVariableNodeNotAccessable
	}
}

获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换:

func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) {
	if expectedType == nil {
		return v, nil
	}
	if v == nil {
		return nil, errors.New("value is nil")
	}
	switch *expectedType {
	case StringType:
		return castToString(v)
	case IntegerType:
		return castToInt32(v)
	case FloatType:
		return castToFloat32(v)
	case ArrayIntegerType:
		return castToInt32Array(v)
	case ArrayFloatType:
		return castToFloat32Array(v)
    // ...

这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写:

func castToString(v any) (string, error) {
	if v == nil {
		return "", nil
	}
	switch value := v.(type) {
	case string:
		return value, nil
	case int:
		return strconv.FormatInt(int64(value), 10), nil
	case int32:
		return strconv.FormatInt(int64(value), 10), nil
	case int64:
		return strconv.FormatInt(value, 10), nil
	case float32:
		return strconv.FormatFloat(float64(value), 'f', -1, 32), nil
	case float64:
		return strconv.FormatFloat(value, 'f', -1, 64), nil
	case bool:
		return strconv.FormatBool(value), nil
	default:
		return "", errVariableTypeIncompatible
	}
}

测试工作流

工作流实际是一个非常复杂的系统, 很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。 那么怎样来保证代码质量呢?

使用TDD

曾经在知乎上看到一个很有意思的问题和回答:

“如何激怒一个Golang开发者?” “你写的代码像Java……”

其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言, 但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。

在工作流的实现中,我们充分运用这一思想, 通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。 举个例子, 大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求:


type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型:

而大模型并不都具有这样的魔法,具体的实现方式是这样的:

  • 对于OpenAI,它支持json_mode,可以设置该参数并直接在提示词中描述json的结构[6]
  • 对于Minimax,它支持设定返回格式,需要构建一个特殊的参数来控制json结构[7]

因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异:

type LlmRequestBuilderImpl struct {
	schemaGenerator JsonSchemaGenerator
	minimaxAdaptor  MinimaxAdaptor
	gptAdaptor      GptAdaptor
}

对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中, 对于GptAdaptor我们有这些测试用例:

// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) { 
	// ... 
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any)
	x := ext["response_format"].(map[string]string)
	assert.Equal(t, x["type"], "json_object")
	assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
		`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) {
	// ...
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	assert.Equal(t, req.ExtendedLLMArguments == nil, true)
	assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
		`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

// 如果输出不是JSON格式,那么什么也不做
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) {
	// ...
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	assert.Equal(t, req.ExtendedLLMArguments == nil, true)
	assert.Equal(t, req.Context, `you're a helpful assistant.`)
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可:

func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) {
	client := NewMockGatewayClient(t)
	builder := NewMockLlmRequestBuilder(t)
	builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil)
	node := LlmNode{
		Id:     "llm1",
		client: client,
		Properties: LlmNodeProperties{
			Context: "you're helpful assistant",
			Prompt:  "give me a name",
			Stream:  false,
		},
		requestBuilder: builder,
	}
    // ...

使用mockery[8]可以为指定的interface自动生成mock对象。 注意不是字节开源的那个Mockey[9],这个框架实际上存在一些坑:

  • 需要添加一些参数来禁用内联和编译优化
  • 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果

自动化集成测试

对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构, 我们不需要关心工作流执行的Side effect,只需要关心事件即可。 也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。

例如,我们有一个用例测试条件节点:

func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) {
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`))
	}))

	data, _ := os.ReadFile("./schema/cases/llm-condition.json")
	json, err := workflow.Parse(data)
	assert.NilError(t, err)
	builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{
		BaseUrl: server.URL,
	})
	dag, errs := builder.ValidateAndToDAG()
	assert.Equal(t, len(errs), 0)
	executor, err := workflow.NewExecutor(*dag)
	assert.NilError(t, err)
	channel := executor.Subscribe()
	go func() {
		w := workflow.NewEmptyWorkflowContext(context.TODO())
		w.SetParameters(map[string]any{
			"input": "what's the main product of kingsoft?",
		})
		err := executor.ExecSync(w)
		assert.NilError(t, err)
	}()
	events := []workflow.WorkflowEvent{}
	for e := range channel {
		events = append(events, e)
	}

	assert.Equal(t, len(events), 12)

	assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
		return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2"
	})
	assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
		return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes"
	})
}

在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。 在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。

当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。 这也意味着集成测试的数目理论上应该远小于单元测试, 或者说单元测试应该更充分。

CLI测试工具

除了单元测试、集成测试等方式来测试工作流的执行之外, 很多时候,我们需要定位分析问题, 是很麻烦的。

比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A; 或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。

为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果:

未来展望

工作流模型是一个高度可扩展的模型, 可以通过添加新的节点类型、插件等, 实现更复杂的功能。

一些在计划内的功能有:

  • RAG:知识库检索
  • 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑
  • 图像生成:类似Coze的图像流,包含文生图以及图像处理等
  • 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件
  • 嵌套的Workflow:允许在Workflow中嵌套调用

尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层, 目前来看, 大概率Agent才是AI应用的尽头。

在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具, 我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。

尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么? 在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。 这个问题留给聪明的读者去思考:p

关于我们为什么要做Workflow,感兴趣可以从这里[10]了解更多的背景信息。

  1. 虽然模型本身可以推理,一些情况下是可以推算出结果,但无法做到准确。即使是人,也很难直接计算诸如10988x727664=?这样的问题
  2. https://www.deeplearning.ai/the-batch/issue-242/
  3. https://www.53ai.com/news/qianyanjishu/1317.html
  4. ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。
  5. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
  6. https://platform.openai.com/docs/guides/json-mode
  7. https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8
  8. https://github.com/vektra/mockery
  9. https://github.com/bytedance/mockey/blob/main/README_cn.md
  10. AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF