Workflow design:修订间差异

来自WHY42
Riguz留言 | 贡献
Riguz留言 | 贡献
无编辑摘要
 
(未显示同一用户的62个中间版本)
第1行: 第1行:
随着2023年大模型在国内的发展成熟,现在大家逐步开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。的确,看似无所不能的大模型,也并不是直接丢到一个系统就能用的,如何开发大模型应用,实际上还处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能给出一个"best practice";对于大模型应用开发来说,却仍还在探索阶段。
随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。
目前,一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个level的:
 
目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的:


[[Image: LLM-app-levels.png|600px]]
[[Image: LLM-app-levels.png|600px]]
第81行: 第82行:
== 传递闭包简化 ==
== 传递闭包简化 ==
对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。
对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。
这对于理解和执行工作流都可能造成麻烦。例如,下图中a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。
这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。
 
[[Image:transitive-reduction.png|400px]]
 
= 使用Go语言实现工作流 =
 
在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:
 
* 开始、结束
* 条件判断
* 大模型调用
* 插件:包括bing搜索、高召回、分流
 
== 概念模型 ==
 
尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向,
但实际工作流产品的设计上,一条边还可能附带一些其他的信息。
 
例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的<ref>ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。</ref>:
 
[[Image:sdxl-flow.png|600px]]
 
当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:
 
[[Image:coze-workflow.png|600px]]
 
注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。
经过一些权衡之后,我们提出了如下的一个工作流概念模型:
 
[[Image:Workflow-concept-model.png|600px]]
 
这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质,
因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。
 
这里,对于这个“点”叫什么名字好其实有一个小故事。
最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。
彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。
过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。
 
若合一契,岂不快哉!
 
== 定义工作流Schema ==
 
在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?
 
这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:
 
* DAG的表示,即节点和边
* 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同
 
我们使用JSON Schema定义了一个工作流格式,大概长这样:
 
<syntaxhighlight lang="json">
{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "nodes": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/node"
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/link"
      }
    },
    ...
  }
}
</syntaxhighlight>
 
对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:
 
<syntaxhighlight lang="json">
{
  ...
  "bingSearchPluginProperties": {
    "properties": {
      "pluginType": {
        "type": "string"
      },
      "version": {
        "type": "string"
      },
      "query": {
        "$ref": "#/definitions/variableBinding"
      },
      "count": {
        "type": "number"
      }
    },
    "required": [
      "pluginType",
      "version",
      "query"
    ]
  }
}
</syntaxhighlight>
 
完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:
 
* 这个工作流是否合法?至少格式上是否正确
* 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?
 
对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!
 
== 工作流的构建==
=== 对工作流的抽象定义 ===
首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:
<syntaxhighlight lang="go">
type Port struct {
Direction PortDirection `json:"type"`
Id        string        `json:"id"`
}
 
type Vertex struct {
Id      string
Type    string
Ports  []Port
Payload any
}
 
type Connection struct {
From      string
To        string
SourcePort *Port
TargetPort *Port
}
 
type PortawareDAG interface {
AddVertex(node Vertex) error
GetVertex(id string) (Vertex, error)
Connect(fromId string, toId string) error
ConnectFromPort(fromId string, toId string, toPort string) error
TopologicalSorting() ([]Vertex, error)
TransitiveReduction() (PortawareDAG, error)
    // ...
}
</syntaxhighlight>
 
而对于具体的不同的节点,是通过另外的Node接口来定义:
 
<syntaxhighlight lang="bash">
type Node interface {
GetId() string
GetType() NodeType
Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
    // ...
}
</syntaxhighlight>
 
其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现,
比如大模型节点的实现有单独的定义:
 
<syntaxhighlight lang="bash">
type LlmNode struct {
Id            string
Name          string
client        GatewayClient
Properties    LlmNodeProperties
requestBuilder LlmRequestBuilder
}
</syntaxhighlight>
 
=== 解析动态类型 ===
 
由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。
比如,我们JSON中大概是这样定义的:
<syntaxhighlight lang="bash">
{
    "nodes": [
        { "type": "start", ...},
        { "type": "llm", ...}
    ]
}
</syntaxhighlight>
 
而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式,
不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:
 
<syntaxhighlight lang="go">
for index, node := range w.Nodes {
jsonData, _ := json.Marshal(node.Properties)
var p any
var err error
t := node.Type
if t == TypeStart {
p, err = loadStartNode(jsonData)
} else if t == TypeLlm {
p, err = loadLlmNode(jsonData)
} else if t == TypeEnd {
p, err = loadHttpNode(jsonData)
}
// ...
</syntaxhighlight>
 
== 异步工作流运行模型 ==
=== Pub/Sub模式 ===
在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。
Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。
在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。
 
这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:
 
<syntaxhighlight lang="bash">
type Subscriber[T any] chan T
type TopicFilter func(event any) bool
 
type Publisher[T any] struct {
name        string
mutex      sync.RWMutex
buffer      int
closed      bool
subscribers map[Subscriber[T]]TopicFilter
}
</syntaxhighlight>
 
实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:
 
<syntaxhighlight lang="bash">
func (p *Publisher[T]) Publish(event T) {
p.mutex.RLock()
defer p.mutex.RUnlock()
 
var wg sync.WaitGroup
for subscriber, filter := range p.subscribers {
subscriber := subscriber
if filter == nil || filter(event) {
wg.Add(1)
go func(event T, channel chan<- T) {
defer wg.Done()
 
channel <- event
}(event, subscriber)
}
}
wg.Wait()
}
</syntaxhighlight>
 
这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。
 
<syntaxhighlight lang="bash">
type Executor interface {
Subscribe() Subscriber[WorkflowEvent]
Close()
ExecSync(ctx WorkflowContext) error
}
</syntaxhighlight>
 
=== 使用Go routine实现异步运行 ===
前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点,
那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。
 
进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。
可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。
其运行原理如下面的伪代码所示:
 
<syntaxhighlight lang="python">
queue <- dag.typological_sort()
while queue.is_not_empty():
    ready_nodes <- queue.pop_zero_degree_nodes()
        for node in ready_nodes start:
            node.execute()
</syntaxhighlight>
 
再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。
最终,我们执行工作流的方法逻辑如下:
 
<syntaxhighlight lang="go">
func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
defer e.Close()
e.eventPublisher.Publish(NewWorkflowStartedEvent())
errGroup, ctx := errgroup.WithContext(c.GetContext())
var doneGroup sync.WaitGroup
for _, node := range e.pendingNodes {
node := node
waitChannel := e.notifyChannels[node.Id]
errGroup.Go(func() error {
select {
case <-ctx.Done():
return errCancelledByContext
case inputs := <-waitChannel:
nodeInstance, ok := node.Payload.(Node)
if !ok {
return errFailedToGetNodeInstance
}
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
})
}
 
e.popReadyNodes()
 
err := errGroup.Wait()
if err != nil {
return err
}
doneGroup.Wait()
e.eventPublisher.Publish(NewWorkflowFinishedEvent())
return nil
}
</syntaxhighlight>
 
有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine,
但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类,
这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。
 
<syntaxhighlight lang="go">
select {
case <-ctx.Done():            // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
// ...
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
</syntaxhighlight>
 
注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行,
这个方法的主要作用是找到当前节点中,所有可以运行的节点。
例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束,
那么B就可以开始运行。
这时候,A的运行结果将作为B的输入信息。
 
<syntaxhighlight lang="bash">
func (e *AsyncExecutor) popReadyNodes() error {
defer e.lock.Unlock()
e.lock.Lock()
 
for _, node := range e.pendingNodes {
inputs, ready, err := e.getInputs(node.Id)
if err != nil {
return err
}
if !ready {
return nil
}
picked := e.pendingNodes[0]
e.pendingNodes = e.pendingNodes[1:]          // 从尚未执行的节点队列中移除队首
notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
notifyChannel <- inputs
}
return nil
}
</syntaxhighlight>
 
这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。
整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。
细心的读者容易想到一个问题,就是当工作流开始执行的时候,
总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。
 
执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。
其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。


[[Image:transitive-reduction.png|600px]]
这是最开始的逻辑:
<syntaxhighlight lang="python">
// 等待所有依赖节点发送通知,
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
results := []NodeResult{}
for _, ch := range waitChannels {
result := <-ch
results = append(results, result)
}
return results
}
</syntaxhighlight>
 
仔细思考其实这样实现问题挺多的:
 
* 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
* 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞
 
==条件分支控制 ==
在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效,
但是,如果考虑到条件节点这一种特殊的节点,
就会存在问题。
 
[[Image:Cond-node.png|600px]]
 
如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个,
我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行),
对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。
 
而在具体的实现上,我们为节点定义了一些属性:
 
* TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等
* ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况)
* OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中
 
通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 <ref>https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html</ref>,但尽力避免不必要的复杂性。
 
==引用和表达式解析==
在工作流中,我们需要在一个节点中使用其他节点的结果。
比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑:
 
* 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等
* 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的
* 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用
 
在Workflow的定义中,我们通过一个简单的表达式来指代引用:
 
<syntaxhighlight lang="json">
"expressions": [
  {
    "variable": "$start1.input", // 获取开始节点(start1)中的input输出变量
    "operator": "contains",
    "target": {
      "type": "fixed",
      "value": "kingsoft"
    }
  }
]
</syntaxhighlight>
 
这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。
但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构,
需要进行递归寻址:
 
<syntaxhighlight lang="go">
func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) {
if input.NodeId == nodeId {
return input, nil
} else {
for _, parent := range input.Inputs {
result, err := resolveNodeResultRecursively(parent, nodeId)
if err == nil {
return result, nil
}
}
return NodeResult{}, errVariableNodeNotAccessable
}
}
</syntaxhighlight>
 
获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换:
 
<syntaxhighlight lang="go">
func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) {
if expectedType == nil {
return v, nil
}
if v == nil {
return nil, errors.New("value is nil")
}
switch *expectedType {
case StringType:
return castToString(v)
case IntegerType:
return castToInt32(v)
case FloatType:
return castToFloat32(v)
case ArrayIntegerType:
return castToInt32Array(v)
case ArrayFloatType:
return castToFloat32Array(v)
    // ...
</syntaxhighlight>
 
这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写:
 
<syntaxhighlight lang="go">
func castToString(v any) (string, error) {
if v == nil {
return "", nil
}
switch value := v.(type) {
case string:
return value, nil
case int:
return strconv.FormatInt(int64(value), 10), nil
case int32:
return strconv.FormatInt(int64(value), 10), nil
case int64:
return strconv.FormatInt(value, 10), nil
case float32:
return strconv.FormatFloat(float64(value), 'f', -1, 32), nil
case float64:
return strconv.FormatFloat(value, 'f', -1, 64), nil
case bool:
return strconv.FormatBool(value), nil
default:
return "", errVariableTypeIncompatible
}
}
</syntaxhighlight>
 
== 测试工作流 ==
工作流实际是一个非常复杂的系统,
很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。
那么怎样来保证代码质量呢?
 
=== 使用TDD ===
曾经在知乎上看到一个很有意思的问题和回答:
 
“如何激怒一个Golang开发者?”
“你写的代码像Java……”
 
其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言,
但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。
 
在工作流的实现中,我们充分运用这一思想,
通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。
举个例子,
大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求:
 
 
<syntaxhighlight lang="go">
type LlmNode struct {
Id            string
Name          string
client        GatewayClient
Properties    LlmNodeProperties
requestBuilder LlmRequestBuilder
}
</syntaxhighlight>
 
而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型:
 
[[Image:coze-json.png|600px]]
 
而大模型并不都具有这样的魔法,具体的实现方式是这样的:
 
* 对于OpenAI,它支持json_mode,可以设置该参数并直接在提示词中描述json的结构<ref>https://platform.openai.com/docs/guides/json-mode</ref>
* 对于Minimax,它支持设定返回格式,需要构建一个特殊的参数来控制json结构<ref>https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8</ref>
 
因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异:
 
<syntaxhighlight lang="go">
type LlmRequestBuilderImpl struct {
schemaGenerator JsonSchemaGenerator
minimaxAdaptor  MinimaxAdaptor
gptAdaptor      GptAdaptor
}
</syntaxhighlight>
 
对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中,
对于GptAdaptor我们有这些测试用例:
 
<syntaxhighlight lang="go">
// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any)
x := ext["response_format"].(map[string]string)
assert.Equal(t, x["type"], "json_object")
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
 
// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
 
// 如果输出不是JSON格式,那么什么也不做
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, `you're a helpful assistant.`)
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
</syntaxhighlight>
 
而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可:
 
<syntaxhighlight lang="go">
func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) {
client := NewMockGatewayClient(t)
builder := NewMockLlmRequestBuilder(t)
builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil)
node := LlmNode{
Id:    "llm1",
client: client,
Properties: LlmNodeProperties{
Context: "you're helpful assistant",
Prompt:  "give me a name",
Stream:  false,
},
requestBuilder: builder,
}
    // ...
</syntaxhighlight>
 
使用mockery<ref>https://github.com/vektra/mockery</ref>可以为指定的interface自动生成mock对象。
注意不是字节开源的那个Mockey<ref>https://github.com/bytedance/mockey/blob/main/README_cn.md</ref>,这个框架实际上存在一些坑:
 
* 需要添加一些参数来禁用内联和编译优化
* 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果
 
=== 自动化集成测试 ===
对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构,
我们不需要关心工作流执行的Side effect,只需要关心事件即可。
也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。
 
例如,我们有一个用例测试条件节点:
 
<syntaxhighlight lang="go">
func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`))
}))
 
data, _ := os.ReadFile("./schema/cases/llm-condition.json")
json, err := workflow.Parse(data)
assert.NilError(t, err)
builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{
BaseUrl: server.URL,
})
dag, errs := builder.ValidateAndToDAG()
assert.Equal(t, len(errs), 0)
executor, err := workflow.NewExecutor(*dag)
assert.NilError(t, err)
channel := executor.Subscribe()
go func() {
w := workflow.NewEmptyWorkflowContext(context.TODO())
w.SetParameters(map[string]any{
"input": "what's the main product of kingsoft?",
})
err := executor.ExecSync(w)
assert.NilError(t, err)
}()
events := []workflow.WorkflowEvent{}
for e := range channel {
events = append(events, e)
}
 
assert.Equal(t, len(events), 12)
 
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2"
})
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes"
})
}
</syntaxhighlight>
 
在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。
在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。
 
当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。
这也意味着集成测试的数目理论上应该远小于单元测试,
或者说单元测试应该更充分。
 
=== CLI测试工具 ===
除了单元测试、集成测试等方式来测试工作流的执行之外,
很多时候,我们需要定位分析问题,
是很麻烦的。
 
比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A;
或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。
 
为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果:
 
[[Image:workflow-cli.gif|600px]]


= 未来展望 =
= 未来展望 =


关于为什么要做Workflow,感兴趣可以从这里了解一些其他的信息<ref>AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF</ref>
工作流模型是一个高度可扩展的模型,
可以通过添加新的节点类型、插件等,
实现更复杂的功能。
 
一些在计划内的功能有:
 
* RAG:知识库检索
* 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑
* 图像生成:类似Coze的图像流,包含文生图以及图像处理等
* 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件
* 嵌套的Workflow:允许在Workflow中嵌套调用
 
尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层,
目前来看,
大概率Agent才是AI应用的尽头。
 
在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具,
我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。
 
尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么?
在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。
这个问题留给聪明的读者去思考:p
 
关于我们为什么要做Workflow,感兴趣可以从这里<ref>AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF</ref>了解更多的背景信息。

2024年7月29日 (一) 07:11的最新版本

随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。

目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的:

最简单的Wrapper层级,实际就是把大模型的提示词封装成一个固定功能的”方法“,然后集成到业务系统中。KPP的Function(或者Prompt),就是一个Wrapper。Wrapper能做什么完全取决于大模型的能力,比如生成一段文字,甚至写一段代码,都没有问题。但是如果想做一个复杂的功能,恐怕就不行了,比如,先生成代码再执行得到代码运行结果,这样就没法直接通过大模型请求来做了,原因是大模型本身无法直接执行代码 [1]

显然,对于复杂的功能,我们可以尝试分步骤来解决。比如一键生成PPT就是很好的例子,第一步先生成一个outline;然后再根据outline,去填充内容;最后通过一个生成文档的API生成一个美化后的PPT。这样每一步都可以用最适合的技术来进行,比如生成outline和填充内容用LLM,可以选择最合适的模型;美化PPT则可以直接使用已有的API来处理。

这其实就是Flow了,或者更明确一点,Agentic Workflow[2]。不是普通的工作流,是“智能”的工作流。通过流程的编排,我们可以扩充LLM的能力,通过一些组合来实现复杂的业务功能。相对于Wrapper而言,Workflow可以更好地实现复杂的AI能力。 因此,我们在KPP中也实现了Workflow的能力。这里就技术层面分享一下KPP在构建Workflow时的一些设计和实现。

为什么要自研,搞个开源的不行么?

实际上,在开始设计工作流之前,就已经有不少公司已经在做了。其中最具有参考价值的主要是Dify和扣子。其中,Dify是一个开源的系统,理论上可以直接拿过来用。 但是,考虑到WPS AI的实际情况,有几个比较重要的问题是Dify难以支撑的:

  • 无法直接嵌入到WPS AI的现有体系,包括网关、KPP、AI Server等,一定会存在一个二次开发的成本
  • Dify是用Python写的,如果大规模使用,很可能会遇到性能问题,包括工作流本身的异步多线程执行、以及水平扩展的性价比

实际上,字节的Coze是一个非常适合我们的开发平台。它的产品设计相当专业和完善,Workflow、图像流、Agent、多Agent协作等,是目前所有平台中定义最清晰的。相对而言,一些平台自己都不知道自己在做什么,比如百度的Agent Builder和App Builder,做着做着就成一样的了[3]。 Dify在这方面也有明显的问题,看起来还没有搞清楚Agent和Workflow的关系。

然而很遗憾目前为止没有一个开源的系统可以达到Coze的高度。 正是由于种种原因,最终KPP决定完全自研Workflow系统。

理论基础

即便从零开始实现一个Workflow其实也比较简单,因为从原理上来说,Workflow无非就是一个DAG(有向无环图)而已。 它是工作流的核心。DAG相信大家都比较熟悉,之所以要用到DAG正式因为它有两个重要的特点:

  • 节点之间的连接是有向的,否则我们将无法判断节点之间的依赖关系和执行顺序
  • 整个图是无环的,这也限制了在Workflow中是不可以存在循环的。这点非常重要,如果是一个Agent,那么我们可以一直给Agent发消息聊下去,这是一个循环;但是Workflow一定不能存在这种不确定的循环,或者说,我们期望在Workflow能够在确定有限的时间内运行完成并获取结果,它是Stateless(无状态)的

使用拓扑排序确定执行顺序

执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束?

考虑如下的几种场景:

从图中我们可以看出这也一些信息:

  • 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG
  • 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行
  • 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行

我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。

拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。

拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。

值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。

事件驱动架构

确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了:

sorted_nodes <- dag.topological_sort()
for node in sorted_nodes:
    node.wait_for_dependencies()
    node.execute()

但是我们忽略了几个重要的问题:

  • 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见
  • 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。

使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 这样对于工作流执行引擎将有非常强的扩展性和可测试性。

传递闭包简化

对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。

使用Go语言实现工作流

在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:

  • 开始、结束
  • 条件判断
  • 大模型调用
  • 插件:包括bing搜索、高召回、分流

概念模型

尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, 但实际工作流产品的设计上,一条边还可能附带一些其他的信息。

例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的[4]

当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:

注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 经过一些权衡之后,我们提出了如下的一个工作流概念模型:

这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, 因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。

这里,对于这个“点”叫什么名字好其实有一个小故事。 最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。

若合一契,岂不快哉!

定义工作流Schema

在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?

这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:

  • DAG的表示,即节点和边
  • 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同

我们使用JSON Schema定义了一个工作流格式,大概长这样:

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "nodes": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/node"
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/link"
      }
    },
    ...
  }
}

对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:

{
  ...
  "bingSearchPluginProperties": {
    "properties": {
      "pluginType": {
        "type": "string"
      },
      "version": {
        "type": "string"
      },
      "query": {
        "$ref": "#/definitions/variableBinding"
      },
      "count": {
        "type": "number"
      }
    },
    "required": [
      "pluginType",
      "version",
      "query"
    ]
  }
}

完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:

  • 这个工作流是否合法?至少格式上是否正确
  • 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?

对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!

工作流的构建

对工作流的抽象定义

首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:

type Port struct {
	Direction PortDirection `json:"type"`
	Id        string        `json:"id"`
}

type Vertex struct {
	Id      string
	Type    string
	Ports   []Port
	Payload any
}

type Connection struct {
	From       string
	To         string
	SourcePort *Port
	TargetPort *Port
}

type PortawareDAG interface {
	AddVertex(node Vertex) error
	GetVertex(id string) (Vertex, error)
	Connect(fromId string, toId string) error
	ConnectFromPort(fromId string, toId string, toPort string) error
	TopologicalSorting() ([]Vertex, error)
	TransitiveReduction() (PortawareDAG, error)
    // ...
}

而对于具体的不同的节点,是通过另外的Node接口来定义:

type Node interface {
	GetId() string
	GetType() NodeType
	Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
    // ...
}

其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, 比如大模型节点的实现有单独的定义:

type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

解析动态类型

由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。 比如,我们JSON中大概是这样定义的:

{
    "nodes": [ 
        { "type": "start", ...},
        { "type": "llm", ...}
    ]
}

而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式, 不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:

for index, node := range w.Nodes {
	jsonData, _ := json.Marshal(node.Properties)
	var p any
	var err error
	t := node.Type
	if t == TypeStart {
		p, err = loadStartNode(jsonData)
	} else if t == TypeLlm {
		p, err = loadLlmNode(jsonData)
	} else if t == TypeEnd {
		p, err = loadHttpNode(jsonData)
	}
	// ...

异步工作流运行模型

Pub/Sub模式

在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。 Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。 在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。

这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:

type Subscriber[T any] chan T
type TopicFilter func(event any) bool

type Publisher[T any] struct {
	name        string
	mutex       sync.RWMutex
	buffer      int
	closed      bool
	subscribers map[Subscriber[T]]TopicFilter
}

实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:

func (p *Publisher[T]) Publish(event T) {
	p.mutex.RLock()
	defer p.mutex.RUnlock()

	var wg sync.WaitGroup
	for subscriber, filter := range p.subscribers {
		subscriber := subscriber
		if filter == nil || filter(event) {
			wg.Add(1)
			go func(event T, channel chan<- T) {
				defer wg.Done()

				channel <- event
			}(event, subscriber)
		}
	}
	wg.Wait()
}

这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。

type Executor interface {
	Subscribe() Subscriber[WorkflowEvent]
	Close()
	ExecSync(ctx WorkflowContext) error
}

使用Go routine实现异步运行

前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点, 那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。

进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。 可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。 其运行原理如下面的伪代码所示:

queue <- dag.typological_sort()
while queue.is_not_empty():
    ready_nodes <- queue.pop_zero_degree_nodes()
        for node in ready_nodes start:
            node.execute()

再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。 最终,我们执行工作流的方法逻辑如下:

func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
	defer e.Close()
	e.eventPublisher.Publish(NewWorkflowStartedEvent())
	errGroup, ctx := errgroup.WithContext(c.GetContext())
	var doneGroup sync.WaitGroup
	for _, node := range e.pendingNodes {
		node := node
		waitChannel := e.notifyChannels[node.Id]
		errGroup.Go(func() error {
			select {
			case <-ctx.Done():
				return errCancelledByContext
			case inputs := <-waitChannel:
				nodeInstance, ok := node.Payload.(Node)
				if !ok {
					return errFailedToGetNodeInstance
				}
				return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
			}
		})
	}

	e.popReadyNodes()

	err := errGroup.Wait()
	if err != nil {
		return err
	}
	doneGroup.Wait()
	e.eventPublisher.Publish(NewWorkflowFinishedEvent())
	return nil
}

有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine, 但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类, 这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。

select {
case <-ctx.Done():            // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
	return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
	// ... 
	return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}

注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行, 这个方法的主要作用是找到当前节点中,所有可以运行的节点。 例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束, 那么B就可以开始运行。 这时候,A的运行结果将作为B的输入信息。

func (e *AsyncExecutor) popReadyNodes() error {
	defer e.lock.Unlock()
	e.lock.Lock()

	for _, node := range e.pendingNodes {
		inputs, ready, err := e.getInputs(node.Id)
		if err != nil {
			return err
		}
		if !ready {
			return nil
		}
		picked := e.pendingNodes[0]
		e.pendingNodes = e.pendingNodes[1:]          // 从尚未执行的节点队列中移除队首
		notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
		notifyChannel <- inputs
	}
	return nil
}

这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。 整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。 细心的读者容易想到一个问题,就是当工作流开始执行的时候, 总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。

执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。 其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。

这是最开始的逻辑:

// 等待所有依赖节点发送通知
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
	results := []NodeResult{}
	for _, ch := range waitChannels {
		result := <-ch
		results = append(results, result)
	}
	return results
}

仔细思考其实这样实现问题挺多的:

  • 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
  • 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞

条件分支控制

在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效, 但是,如果考虑到条件节点这一种特殊的节点, 就会存在问题。

如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个, 我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行), 对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。

而在具体的实现上,我们为节点定义了一些属性:

  • TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等
  • ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况)
  • OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中

通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 [5],但尽力避免不必要的复杂性。

引用和表达式解析

在工作流中,我们需要在一个节点中使用其他节点的结果。 比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑:

  • 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等
  • 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的
  • 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用

在Workflow的定义中,我们通过一个简单的表达式来指代引用:

"expressions": [
  {
    "variable": "$start1.input", // 获取开始节点(start1)中的input输出变量
    "operator": "contains",
    "target": {
      "type": "fixed",
      "value": "kingsoft"
    }
  }
]

这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。 但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构, 需要进行递归寻址:

func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) {
	if input.NodeId == nodeId {
		return input, nil
	} else {
		for _, parent := range input.Inputs {
			result, err := resolveNodeResultRecursively(parent, nodeId)
			if err == nil {
				return result, nil
			}
		}
		return NodeResult{}, errVariableNodeNotAccessable
	}
}

获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换:

func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) {
	if expectedType == nil {
		return v, nil
	}
	if v == nil {
		return nil, errors.New("value is nil")
	}
	switch *expectedType {
	case StringType:
		return castToString(v)
	case IntegerType:
		return castToInt32(v)
	case FloatType:
		return castToFloat32(v)
	case ArrayIntegerType:
		return castToInt32Array(v)
	case ArrayFloatType:
		return castToFloat32Array(v)
    // ...

这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写:

func castToString(v any) (string, error) {
	if v == nil {
		return "", nil
	}
	switch value := v.(type) {
	case string:
		return value, nil
	case int:
		return strconv.FormatInt(int64(value), 10), nil
	case int32:
		return strconv.FormatInt(int64(value), 10), nil
	case int64:
		return strconv.FormatInt(value, 10), nil
	case float32:
		return strconv.FormatFloat(float64(value), 'f', -1, 32), nil
	case float64:
		return strconv.FormatFloat(value, 'f', -1, 64), nil
	case bool:
		return strconv.FormatBool(value), nil
	default:
		return "", errVariableTypeIncompatible
	}
}

测试工作流

工作流实际是一个非常复杂的系统, 很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。 那么怎样来保证代码质量呢?

使用TDD

曾经在知乎上看到一个很有意思的问题和回答:

“如何激怒一个Golang开发者?” “你写的代码像Java……”

其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言, 但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。

在工作流的实现中,我们充分运用这一思想, 通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。 举个例子, 大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求:


type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型:

而大模型并不都具有这样的魔法,具体的实现方式是这样的:

  • 对于OpenAI,它支持json_mode,可以设置该参数并直接在提示词中描述json的结构[6]
  • 对于Minimax,它支持设定返回格式,需要构建一个特殊的参数来控制json结构[7]

因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异:

type LlmRequestBuilderImpl struct {
	schemaGenerator JsonSchemaGenerator
	minimaxAdaptor  MinimaxAdaptor
	gptAdaptor      GptAdaptor
}

对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中, 对于GptAdaptor我们有这些测试用例:

// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) { 
	// ... 
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any)
	x := ext["response_format"].(map[string]string)
	assert.Equal(t, x["type"], "json_object")
	assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
		`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) {
	// ...
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	assert.Equal(t, req.ExtendedLLMArguments == nil, true)
	assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
		`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

// 如果输出不是JSON格式,那么什么也不做
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) {
	// ...
	err := adaptor.FixRequest(&req, p)
	assert.NilError(t, err)
	assert.Equal(t, req.ExtendedLLMArguments == nil, true)
	assert.Equal(t, req.Context, `you're a helpful assistant.`)
	assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}

而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可:

func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) {
	client := NewMockGatewayClient(t)
	builder := NewMockLlmRequestBuilder(t)
	builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil)
	node := LlmNode{
		Id:     "llm1",
		client: client,
		Properties: LlmNodeProperties{
			Context: "you're helpful assistant",
			Prompt:  "give me a name",
			Stream:  false,
		},
		requestBuilder: builder,
	}
    // ...

使用mockery[8]可以为指定的interface自动生成mock对象。 注意不是字节开源的那个Mockey[9],这个框架实际上存在一些坑:

  • 需要添加一些参数来禁用内联和编译优化
  • 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果

自动化集成测试

对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构, 我们不需要关心工作流执行的Side effect,只需要关心事件即可。 也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。

例如,我们有一个用例测试条件节点:

func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) {
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`))
	}))

	data, _ := os.ReadFile("./schema/cases/llm-condition.json")
	json, err := workflow.Parse(data)
	assert.NilError(t, err)
	builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{
		BaseUrl: server.URL,
	})
	dag, errs := builder.ValidateAndToDAG()
	assert.Equal(t, len(errs), 0)
	executor, err := workflow.NewExecutor(*dag)
	assert.NilError(t, err)
	channel := executor.Subscribe()
	go func() {
		w := workflow.NewEmptyWorkflowContext(context.TODO())
		w.SetParameters(map[string]any{
			"input": "what's the main product of kingsoft?",
		})
		err := executor.ExecSync(w)
		assert.NilError(t, err)
	}()
	events := []workflow.WorkflowEvent{}
	for e := range channel {
		events = append(events, e)
	}

	assert.Equal(t, len(events), 12)

	assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
		return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2"
	})
	assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
		return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes"
	})
}

在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。 在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。

当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。 这也意味着集成测试的数目理论上应该远小于单元测试, 或者说单元测试应该更充分。

CLI测试工具

除了单元测试、集成测试等方式来测试工作流的执行之外, 很多时候,我们需要定位分析问题, 是很麻烦的。

比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A; 或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。

为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果:

未来展望

工作流模型是一个高度可扩展的模型, 可以通过添加新的节点类型、插件等, 实现更复杂的功能。

一些在计划内的功能有:

  • RAG:知识库检索
  • 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑
  • 图像生成:类似Coze的图像流,包含文生图以及图像处理等
  • 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件
  • 嵌套的Workflow:允许在Workflow中嵌套调用

尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层, 目前来看, 大概率Agent才是AI应用的尽头。

在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具, 我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。

尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么? 在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。 这个问题留给聪明的读者去思考:p

关于我们为什么要做Workflow,感兴趣可以从这里[10]了解更多的背景信息。

  1. 虽然模型本身可以推理,一些情况下是可以推算出结果,但无法做到准确。即使是人,也很难直接计算诸如10988x727664=?这样的问题
  2. https://www.deeplearning.ai/the-batch/issue-242/
  3. https://www.53ai.com/news/qianyanjishu/1317.html
  4. ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。
  5. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
  6. https://platform.openai.com/docs/guides/json-mode
  7. https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8
  8. https://github.com/vektra/mockery
  9. https://github.com/bytedance/mockey/blob/main/README_cn.md
  10. AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF