Workflow design:修订间差异
无编辑摘要 |
|||
(未显示同一用户的73个中间版本) | |||
第1行: | 第1行: | ||
随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。 | |||
目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的: | |||
[[Image: LLM-app-levels.png|600px]] | [[Image: LLM-app-levels.png|600px]] | ||
第35行: | 第36行: | ||
== 使用拓扑排序确定执行顺序 == | == 使用拓扑排序确定执行顺序 == | ||
执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束? | |||
考虑如下的几种场景: | |||
[[Image:DAG-cases.png|600px]] | [[Image:DAG-cases.png|600px]] | ||
从图中我们可以看出这也一些信息: | |||
* 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG | |||
* 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行 | |||
* 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行 | |||
我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。 | |||
[[Image:topological-sorting.png|600px]] | |||
拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。 | |||
拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。 | |||
值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。 | |||
== 事件驱动架构 == | == 事件驱动架构 == | ||
确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了: | |||
<syntaxhighlight lang="python"> | |||
sorted_nodes <- dag.topological_sort() | |||
for node in sorted_nodes: | |||
node.wait_for_dependencies() | |||
node.execute() | |||
</syntaxhighlight> | |||
但是我们忽略了几个重要的问题: | |||
* 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见 | |||
* 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。 | |||
使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 | |||
这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 | |||
这样对于工作流执行引擎将有非常强的扩展性和可测试性。 | |||
[[Image:dag-event-driven.png|600px]] | |||
== 传递闭包简化 == | == 传递闭包简化 == | ||
对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 | |||
这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。 | |||
[[Image:transitive-reduction.png|400px]] | |||
= 使用Go语言实现工作流 = | |||
在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点: | |||
* 开始、结束 | |||
* 条件判断 | |||
* 大模型调用 | |||
* 插件:包括bing搜索、高召回、分流 | |||
== 概念模型 == | |||
尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, | |||
但实际工作流产品的设计上,一条边还可能附带一些其他的信息。 | |||
例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的<ref>ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。</ref>: | |||
[[Image:sdxl-flow.png|600px]] | |||
当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况: | |||
[[Image:coze-workflow.png|600px]] | |||
注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 | |||
经过一些权衡之后,我们提出了如下的一个工作流概念模型: | |||
[[Image:Workflow-concept-model.png|600px]] | |||
这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, | |||
因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。 | |||
这里,对于这个“点”叫什么名字好其实有一个小故事。 | |||
最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 | |||
彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 | |||
过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。 | |||
若合一契,岂不快哉! | |||
== 定义工作流Schema == | |||
在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示? | |||
这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括: | |||
* DAG的表示,即节点和边 | |||
* 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同 | |||
我们使用JSON Schema定义了一个工作流格式,大概长这样: | |||
<syntaxhighlight lang="json"> | |||
{ | |||
"type": "object", | |||
"properties": { | |||
"name": { | |||
"type": "string" | |||
}, | |||
"nodes": { | |||
"type": "array", | |||
"items": { | |||
"$ref": "#/definitions/node" | |||
} | |||
}, | |||
"edges": { | |||
"type": "array", | |||
"items": { | |||
"$ref": "#/definitions/link" | |||
} | |||
}, | |||
... | |||
} | |||
} | |||
</syntaxhighlight> | |||
对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如: | |||
<syntaxhighlight lang="json"> | |||
{ | |||
... | |||
"bingSearchPluginProperties": { | |||
"properties": { | |||
"pluginType": { | |||
"type": "string" | |||
}, | |||
"version": { | |||
"type": "string" | |||
}, | |||
"query": { | |||
"$ref": "#/definitions/variableBinding" | |||
}, | |||
"count": { | |||
"type": "number" | |||
} | |||
}, | |||
"required": [ | |||
"pluginType", | |||
"version", | |||
"query" | |||
] | |||
} | |||
} | |||
</syntaxhighlight> | |||
完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题: | |||
* 这个工作流是否合法?至少格式上是否正确 | |||
* 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么? | |||
对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的! | |||
== 工作流的构建== | |||
=== 对工作流的抽象定义 === | |||
首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位: | |||
<syntaxhighlight lang="go"> | |||
type Port struct { | |||
Direction PortDirection `json:"type"` | |||
Id string `json:"id"` | |||
} | |||
type Vertex struct { | |||
Id string | |||
Type string | |||
Ports []Port | |||
Payload any | |||
} | |||
type Connection struct { | |||
From string | |||
To string | |||
SourcePort *Port | |||
TargetPort *Port | |||
} | |||
type PortawareDAG interface { | |||
AddVertex(node Vertex) error | |||
GetVertex(id string) (Vertex, error) | |||
Connect(fromId string, toId string) error | |||
ConnectFromPort(fromId string, toId string, toPort string) error | |||
TopologicalSorting() ([]Vertex, error) | |||
TransitiveReduction() (PortawareDAG, error) | |||
// ... | |||
} | |||
</syntaxhighlight> | |||
而对于具体的不同的节点,是通过另外的Node接口来定义: | |||
<syntaxhighlight lang="bash"> | |||
type Node interface { | |||
GetId() string | |||
GetType() NodeType | |||
Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error) | |||
// ... | |||
} | |||
</syntaxhighlight> | |||
其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, | |||
比如大模型节点的实现有单独的定义: | |||
<syntaxhighlight lang="bash"> | |||
type LlmNode struct { | |||
Id string | |||
Name string | |||
client GatewayClient | |||
Properties LlmNodeProperties | |||
requestBuilder LlmRequestBuilder | |||
} | |||
</syntaxhighlight> | |||
=== 解析动态类型 === | |||
由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。 | |||
比如,我们JSON中大概是这样定义的: | |||
<syntaxhighlight lang="bash"> | |||
{ | |||
"nodes": [ | |||
{ "type": "start", ...}, | |||
{ "type": "llm", ...} | |||
] | |||
} | |||
</syntaxhighlight> | |||
而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式, | |||
不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型: | |||
<syntaxhighlight lang="go"> | |||
for index, node := range w.Nodes { | |||
jsonData, _ := json.Marshal(node.Properties) | |||
var p any | |||
var err error | |||
t := node.Type | |||
if t == TypeStart { | |||
p, err = loadStartNode(jsonData) | |||
} else if t == TypeLlm { | |||
p, err = loadLlmNode(jsonData) | |||
} else if t == TypeEnd { | |||
p, err = loadHttpNode(jsonData) | |||
} | |||
// ... | |||
</syntaxhighlight> | |||
== 异步工作流运行模型 == | |||
=== Pub/Sub模式 === | |||
在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。 | |||
Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。 | |||
在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。 | |||
这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现: | |||
<syntaxhighlight lang="bash"> | |||
type Subscriber[T any] chan T | |||
type TopicFilter func(event any) bool | |||
type Publisher[T any] struct { | |||
name string | |||
mutex sync.RWMutex | |||
buffer int | |||
closed bool | |||
subscribers map[Subscriber[T]]TopicFilter | |||
} | |||
</syntaxhighlight> | |||
实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次: | |||
<syntaxhighlight lang="bash"> | |||
func (p *Publisher[T]) Publish(event T) { | |||
p.mutex.RLock() | |||
defer p.mutex.RUnlock() | |||
var wg sync.WaitGroup | |||
for subscriber, filter := range p.subscribers { | |||
subscriber := subscriber | |||
if filter == nil || filter(event) { | |||
wg.Add(1) | |||
go func(event T, channel chan<- T) { | |||
defer wg.Done() | |||
channel <- event | |||
}(event, subscriber) | |||
} | |||
} | |||
wg.Wait() | |||
} | |||
</syntaxhighlight> | |||
这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。 | |||
<syntaxhighlight lang="bash"> | |||
type Executor interface { | |||
Subscribe() Subscriber[WorkflowEvent] | |||
Close() | |||
ExecSync(ctx WorkflowContext) error | |||
} | |||
</syntaxhighlight> | |||
=== 使用Go routine实现异步运行 === | |||
前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点, | |||
那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。 | |||
进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。 | |||
可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。 | |||
其运行原理如下面的伪代码所示: | |||
<syntaxhighlight lang="python"> | |||
queue <- dag.typological_sort() | |||
while queue.is_not_empty(): | |||
ready_nodes <- queue.pop_zero_degree_nodes() | |||
for node in ready_nodes start: | |||
node.execute() | |||
</syntaxhighlight> | |||
再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。 | |||
最终,我们执行工作流的方法逻辑如下: | |||
<syntaxhighlight lang="go"> | |||
func (e *AsyncExecutor) ExecSync(c WorkflowContext) error { | |||
defer e.Close() | |||
e.eventPublisher.Publish(NewWorkflowStartedEvent()) | |||
errGroup, ctx := errgroup.WithContext(c.GetContext()) | |||
var doneGroup sync.WaitGroup | |||
for _, node := range e.pendingNodes { | |||
node := node | |||
waitChannel := e.notifyChannels[node.Id] | |||
errGroup.Go(func() error { | |||
select { | |||
case <-ctx.Done(): | |||
return errCancelledByContext | |||
case inputs := <-waitChannel: | |||
nodeInstance, ok := node.Payload.(Node) | |||
if !ok { | |||
return errFailedToGetNodeInstance | |||
} | |||
return e.Execute(ctx, nodeInstance, inputs, &doneGroup) | |||
} | |||
}) | |||
} | |||
e.popReadyNodes() | |||
err := errGroup.Wait() | |||
if err != nil { | |||
return err | |||
} | |||
doneGroup.Wait() | |||
e.eventPublisher.Publish(NewWorkflowFinishedEvent()) | |||
return nil | |||
} | |||
</syntaxhighlight> | |||
有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine, | |||
但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类, | |||
这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。 | |||
<syntaxhighlight lang="go"> | |||
select { | |||
case <-ctx.Done(): // 如果超时或者取消,go routine会从这里直接返回,从而中断执行 | |||
return errCancelledByContext | |||
case inputs := <-waitChannel: // 控制节点开始执行的信号channel | |||
// ... | |||
return e.Execute(ctx, nodeInstance, inputs, &doneGroup) | |||
} | |||
</syntaxhighlight> | |||
注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行, | |||
这个方法的主要作用是找到当前节点中,所有可以运行的节点。 | |||
例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束, | |||
那么B就可以开始运行。 | |||
这时候,A的运行结果将作为B的输入信息。 | |||
<syntaxhighlight lang="bash"> | |||
func (e *AsyncExecutor) popReadyNodes() error { | |||
defer e.lock.Unlock() | |||
e.lock.Lock() | |||
for _, node := range e.pendingNodes { | |||
inputs, ready, err := e.getInputs(node.Id) | |||
if err != nil { | |||
return err | |||
} | |||
if !ready { | |||
return nil | |||
} | |||
picked := e.pendingNodes[0] | |||
e.pendingNodes = e.pendingNodes[1:] // 从尚未执行的节点队列中移除队首 | |||
notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行 | |||
notifyChannel <- inputs | |||
} | |||
return nil | |||
} | |||
</syntaxhighlight> | |||
这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。 | |||
整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。 | |||
细心的读者容易想到一个问题,就是当工作流开始执行的时候, | |||
总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。 | |||
执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。 | |||
其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。 | |||
这是最开始的逻辑: | |||
<syntaxhighlight lang="python"> | |||
// 等待所有依赖节点发送通知, | |||
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult { | |||
results := []NodeResult{} | |||
for _, ch := range waitChannels { | |||
result := <-ch | |||
results = append(results, result) | |||
} | |||
return results | |||
} | |||
</syntaxhighlight> | |||
仔细思考其实这样实现问题挺多的: | |||
* 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断 | |||
* 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞 | |||
==条件分支控制 == | |||
在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效, | |||
但是,如果考虑到条件节点这一种特殊的节点, | |||
就会存在问题。 | |||
[[Image:Cond-node.png|600px]] | |||
如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个, | |||
我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行), | |||
对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。 | |||
而在具体的实现上,我们为节点定义了一些属性: | |||
* TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等 | |||
* ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况) | |||
* OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中 | |||
通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 <ref>https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html</ref>,但尽力避免不必要的复杂性。 | |||
==引用和表达式解析== | |||
在工作流中,我们需要在一个节点中使用其他节点的结果。 | |||
比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑: | |||
* 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等 | |||
* 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的 | |||
* 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用 | |||
在Workflow的定义中,我们通过一个简单的表达式来指代引用: | |||
<syntaxhighlight lang="json"> | |||
"expressions": [ | |||
{ | |||
"variable": "$start1.input", // 获取开始节点(start1)中的input输出变量 | |||
"operator": "contains", | |||
"target": { | |||
"type": "fixed", | |||
"value": "kingsoft" | |||
} | |||
} | |||
] | |||
</syntaxhighlight> | |||
这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。 | |||
但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构, | |||
需要进行递归寻址: | |||
<syntaxhighlight lang="go"> | |||
func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) { | |||
if input.NodeId == nodeId { | |||
return input, nil | |||
} else { | |||
for _, parent := range input.Inputs { | |||
result, err := resolveNodeResultRecursively(parent, nodeId) | |||
if err == nil { | |||
return result, nil | |||
} | |||
} | |||
return NodeResult{}, errVariableNodeNotAccessable | |||
} | |||
} | |||
</syntaxhighlight> | |||
获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换: | |||
<syntaxhighlight lang="go"> | |||
func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) { | |||
if expectedType == nil { | |||
return v, nil | |||
} | |||
if v == nil { | |||
return nil, errors.New("value is nil") | |||
} | |||
switch *expectedType { | |||
case StringType: | |||
return castToString(v) | |||
case IntegerType: | |||
return castToInt32(v) | |||
case FloatType: | |||
return castToFloat32(v) | |||
case ArrayIntegerType: | |||
return castToInt32Array(v) | |||
case ArrayFloatType: | |||
return castToFloat32Array(v) | |||
// ... | |||
</syntaxhighlight> | |||
这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写: | |||
<syntaxhighlight lang="go"> | |||
func castToString(v any) (string, error) { | |||
if v == nil { | |||
return "", nil | |||
} | |||
switch value := v.(type) { | |||
case string: | |||
return value, nil | |||
case int: | |||
return strconv.FormatInt(int64(value), 10), nil | |||
case int32: | |||
return strconv.FormatInt(int64(value), 10), nil | |||
case int64: | |||
return strconv.FormatInt(value, 10), nil | |||
case float32: | |||
return strconv.FormatFloat(float64(value), 'f', -1, 32), nil | |||
case float64: | |||
return strconv.FormatFloat(value, 'f', -1, 64), nil | |||
case bool: | |||
return strconv.FormatBool(value), nil | |||
default: | |||
return "", errVariableTypeIncompatible | |||
} | |||
} | |||
</syntaxhighlight> | |||
== 测试工作流 == | |||
工作流实际是一个非常复杂的系统, | |||
很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。 | |||
那么怎样来保证代码质量呢? | |||
=== 使用TDD === | |||
曾经在知乎上看到一个很有意思的问题和回答: | |||
“如何激怒一个Golang开发者?” | |||
“你写的代码像Java……” | |||
其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言, | |||
但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。 | |||
在工作流的实现中,我们充分运用这一思想, | |||
通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。 | |||
举个例子, | |||
大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求: | |||
<syntaxhighlight lang="go"> | |||
type LlmNode struct { | |||
Id string | |||
Name string | |||
client GatewayClient | |||
Properties LlmNodeProperties | |||
requestBuilder LlmRequestBuilder | |||
} | |||
</syntaxhighlight> | |||
而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型: | |||
[[Image:coze-json.png|600px]] | |||
而大模型并不都具有这样的魔法,具体的实现方式是这样的: | |||
* 对于OpenAI,它支持json_mode,可以设置该参数并直接在提示词中描述json的结构<ref>https://platform.openai.com/docs/guides/json-mode</ref> | |||
* 对于Minimax,它支持设定返回格式,需要构建一个特殊的参数来控制json结构<ref>https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8</ref> | |||
因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异: | |||
<syntaxhighlight lang="go"> | |||
type LlmRequestBuilderImpl struct { | |||
schemaGenerator JsonSchemaGenerator | |||
minimaxAdaptor MinimaxAdaptor | |||
gptAdaptor GptAdaptor | |||
} | |||
</syntaxhighlight> | |||
对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中, | |||
对于GptAdaptor我们有这些测试用例: | |||
<syntaxhighlight lang="go"> | |||
// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构 | |||
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) { | |||
// ... | |||
err := adaptor.FixRequest(&req, p) | |||
assert.NilError(t, err) | |||
ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any) | |||
x := ext["response_format"].(map[string]string) | |||
assert.Equal(t, x["type"], "json_object") | |||
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+ | |||
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n") | |||
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?") | |||
} | |||
// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可 | |||
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) { | |||
// ... | |||
err := adaptor.FixRequest(&req, p) | |||
assert.NilError(t, err) | |||
assert.Equal(t, req.ExtendedLLMArguments == nil, true) | |||
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+ | |||
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n") | |||
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?") | |||
} | |||
// 如果输出不是JSON格式,那么什么也不做 | |||
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) { | |||
// ... | |||
err := adaptor.FixRequest(&req, p) | |||
assert.NilError(t, err) | |||
assert.Equal(t, req.ExtendedLLMArguments == nil, true) | |||
assert.Equal(t, req.Context, `you're a helpful assistant.`) | |||
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?") | |||
} | |||
</syntaxhighlight> | |||
而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可: | |||
<syntaxhighlight lang="go"> | |||
func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) { | |||
client := NewMockGatewayClient(t) | |||
builder := NewMockLlmRequestBuilder(t) | |||
builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil) | |||
node := LlmNode{ | |||
Id: "llm1", | |||
client: client, | |||
Properties: LlmNodeProperties{ | |||
Context: "you're helpful assistant", | |||
Prompt: "give me a name", | |||
Stream: false, | |||
}, | |||
requestBuilder: builder, | |||
} | |||
// ... | |||
</syntaxhighlight> | |||
使用mockery<ref>https://github.com/vektra/mockery</ref>可以为指定的interface自动生成mock对象。 | |||
注意不是字节开源的那个Mockey<ref>https://github.com/bytedance/mockey/blob/main/README_cn.md</ref>,这个框架实际上存在一些坑: | |||
* 需要添加一些参数来禁用内联和编译优化 | |||
* 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果 | |||
=== 自动化集成测试 === | |||
对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构, | |||
我们不需要关心工作流执行的Side effect,只需要关心事件即可。 | |||
也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。 | |||
例如,我们有一个用例测试条件节点: | |||
<syntaxhighlight lang="go"> | |||
func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) { | |||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |||
w.Header().Set("Content-Type", "application/json") | |||
w.WriteHeader(http.StatusOK) | |||
w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`)) | |||
})) | |||
data, _ := os.ReadFile("./schema/cases/llm-condition.json") | |||
json, err := workflow.Parse(data) | |||
assert.NilError(t, err) | |||
builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{ | |||
BaseUrl: server.URL, | |||
}) | |||
dag, errs := builder.ValidateAndToDAG() | |||
assert.Equal(t, len(errs), 0) | |||
executor, err := workflow.NewExecutor(*dag) | |||
assert.NilError(t, err) | |||
channel := executor.Subscribe() | |||
go func() { | |||
w := workflow.NewEmptyWorkflowContext(context.TODO()) | |||
w.SetParameters(map[string]any{ | |||
"input": "what's the main product of kingsoft?", | |||
}) | |||
err := executor.ExecSync(w) | |||
assert.NilError(t, err) | |||
}() | |||
events := []workflow.WorkflowEvent{} | |||
for e := range channel { | |||
events = append(events, e) | |||
} | |||
assert.Equal(t, len(events), 12) | |||
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool { | |||
return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2" | |||
}) | |||
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool { | |||
return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes" | |||
}) | |||
} | |||
</syntaxhighlight> | |||
在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。 | |||
在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。 | |||
当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。 | |||
这也意味着集成测试的数目理论上应该远小于单元测试, | |||
或者说单元测试应该更充分。 | |||
=== CLI测试工具 === | |||
除了单元测试、集成测试等方式来测试工作流的执行之外, | |||
很多时候,我们需要定位分析问题, | |||
是很麻烦的。 | |||
比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A; | |||
或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。 | |||
为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果: | |||
[[Image:workflow-cli.gif|600px]] | |||
= 未来展望 = | = 未来展望 = | ||
工作流模型是一个高度可扩展的模型, | |||
可以通过添加新的节点类型、插件等, | |||
实现更复杂的功能。 | |||
一些在计划内的功能有: | |||
* RAG:知识库检索 | |||
* 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑 | |||
* 图像生成:类似Coze的图像流,包含文生图以及图像处理等 | |||
* 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件 | |||
* 嵌套的Workflow:允许在Workflow中嵌套调用 | |||
尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层, | |||
目前来看, | |||
大概率Agent才是AI应用的尽头。 | |||
在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具, | |||
我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。 | |||
尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么? | |||
在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。 | |||
这个问题留给聪明的读者去思考:p | |||
关于我们为什么要做Workflow,感兴趣可以从这里<ref>AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF</ref>了解更多的背景信息。 |
2024年7月29日 (一) 07:11的最新版本
随着2023年大模型在国内的发展成熟,现在大家开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。看似无所不能的大模型,却并不能直接集成到一个系统中,如何开发大模型应用,目前仍然处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能找到一个现有的"best practice";对于大模型应用开发来说,谁也不敢说他的做法已经是“best practice”。
目前,有一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个Level的:
最简单的Wrapper层级,实际就是把大模型的提示词封装成一个固定功能的”方法“,然后集成到业务系统中。KPP的Function(或者Prompt),就是一个Wrapper。Wrapper能做什么完全取决于大模型的能力,比如生成一段文字,甚至写一段代码,都没有问题。但是如果想做一个复杂的功能,恐怕就不行了,比如,先生成代码再执行得到代码运行结果,这样就没法直接通过大模型请求来做了,原因是大模型本身无法直接执行代码 [1]。
显然,对于复杂的功能,我们可以尝试分步骤来解决。比如一键生成PPT就是很好的例子,第一步先生成一个outline;然后再根据outline,去填充内容;最后通过一个生成文档的API生成一个美化后的PPT。这样每一步都可以用最适合的技术来进行,比如生成outline和填充内容用LLM,可以选择最合适的模型;美化PPT则可以直接使用已有的API来处理。
这其实就是Flow了,或者更明确一点,Agentic Workflow[2]。不是普通的工作流,是“智能”的工作流。通过流程的编排,我们可以扩充LLM的能力,通过一些组合来实现复杂的业务功能。相对于Wrapper而言,Workflow可以更好地实现复杂的AI能力。 因此,我们在KPP中也实现了Workflow的能力。这里就技术层面分享一下KPP在构建Workflow时的一些设计和实现。
为什么要自研,搞个开源的不行么?
实际上,在开始设计工作流之前,就已经有不少公司已经在做了。其中最具有参考价值的主要是Dify和扣子。其中,Dify是一个开源的系统,理论上可以直接拿过来用。 但是,考虑到WPS AI的实际情况,有几个比较重要的问题是Dify难以支撑的:
- 无法直接嵌入到WPS AI的现有体系,包括网关、KPP、AI Server等,一定会存在一个二次开发的成本
- Dify是用Python写的,如果大规模使用,很可能会遇到性能问题,包括工作流本身的异步多线程执行、以及水平扩展的性价比
实际上,字节的Coze是一个非常适合我们的开发平台。它的产品设计相当专业和完善,Workflow、图像流、Agent、多Agent协作等,是目前所有平台中定义最清晰的。相对而言,一些平台自己都不知道自己在做什么,比如百度的Agent Builder和App Builder,做着做着就成一样的了[3]。 Dify在这方面也有明显的问题,看起来还没有搞清楚Agent和Workflow的关系。
然而很遗憾目前为止没有一个开源的系统可以达到Coze的高度。 正是由于种种原因,最终KPP决定完全自研Workflow系统。
理论基础
即便从零开始实现一个Workflow其实也比较简单,因为从原理上来说,Workflow无非就是一个DAG(有向无环图)而已。 它是工作流的核心。DAG相信大家都比较熟悉,之所以要用到DAG正式因为它有两个重要的特点:
- 节点之间的连接是有向的,否则我们将无法判断节点之间的依赖关系和执行顺序
- 整个图是无环的,这也限制了在Workflow中是不可以存在循环的。这点非常重要,如果是一个Agent,那么我们可以一直给Agent发消息聊下去,这是一个循环;但是Workflow一定不能存在这种不确定的循环,或者说,我们期望在Workflow能够在确定有限的时间内运行完成并获取结果,它是Stateless(无状态)的
使用拓扑排序确定执行顺序
执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束?
考虑如下的几种场景:
从图中我们可以看出这也一些信息:
- 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG
- 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行
- 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行
我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。
拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。
拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。
值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。
事件驱动架构
确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了:
sorted_nodes <- dag.topological_sort()
for node in sorted_nodes:
node.wait_for_dependencies()
node.execute()
但是我们忽略了几个重要的问题:
- 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见
- 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。
使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 这样对于工作流执行引擎将有非常强的扩展性和可测试性。
传递闭包简化
对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。
使用Go语言实现工作流
在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:
- 开始、结束
- 条件判断
- 大模型调用
- 插件:包括bing搜索、高召回、分流
概念模型
尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, 但实际工作流产品的设计上,一条边还可能附带一些其他的信息。
例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的[4]:
当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:
注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 经过一些权衡之后,我们提出了如下的一个工作流概念模型:
这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, 因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。
这里,对于这个“点”叫什么名字好其实有一个小故事。 最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。
若合一契,岂不快哉!
定义工作流Schema
在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?
这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:
- DAG的表示,即节点和边
- 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同
我们使用JSON Schema定义了一个工作流格式,大概长这样:
{
"type": "object",
"properties": {
"name": {
"type": "string"
},
"nodes": {
"type": "array",
"items": {
"$ref": "#/definitions/node"
}
},
"edges": {
"type": "array",
"items": {
"$ref": "#/definitions/link"
}
},
...
}
}
对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:
{
...
"bingSearchPluginProperties": {
"properties": {
"pluginType": {
"type": "string"
},
"version": {
"type": "string"
},
"query": {
"$ref": "#/definitions/variableBinding"
},
"count": {
"type": "number"
}
},
"required": [
"pluginType",
"version",
"query"
]
}
}
完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:
- 这个工作流是否合法?至少格式上是否正确
- 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?
对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!
工作流的构建
对工作流的抽象定义
首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:
type Port struct {
Direction PortDirection `json:"type"`
Id string `json:"id"`
}
type Vertex struct {
Id string
Type string
Ports []Port
Payload any
}
type Connection struct {
From string
To string
SourcePort *Port
TargetPort *Port
}
type PortawareDAG interface {
AddVertex(node Vertex) error
GetVertex(id string) (Vertex, error)
Connect(fromId string, toId string) error
ConnectFromPort(fromId string, toId string, toPort string) error
TopologicalSorting() ([]Vertex, error)
TransitiveReduction() (PortawareDAG, error)
// ...
}
而对于具体的不同的节点,是通过另外的Node接口来定义:
type Node interface {
GetId() string
GetType() NodeType
Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
// ...
}
其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, 比如大模型节点的实现有单独的定义:
type LlmNode struct {
Id string
Name string
client GatewayClient
Properties LlmNodeProperties
requestBuilder LlmRequestBuilder
}
解析动态类型
由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。 比如,我们JSON中大概是这样定义的:
{
"nodes": [
{ "type": "start", ...},
{ "type": "llm", ...}
]
}
而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式, 不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:
for index, node := range w.Nodes {
jsonData, _ := json.Marshal(node.Properties)
var p any
var err error
t := node.Type
if t == TypeStart {
p, err = loadStartNode(jsonData)
} else if t == TypeLlm {
p, err = loadLlmNode(jsonData)
} else if t == TypeEnd {
p, err = loadHttpNode(jsonData)
}
// ...
异步工作流运行模型
Pub/Sub模式
在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。 Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。 在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。
这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:
type Subscriber[T any] chan T
type TopicFilter func(event any) bool
type Publisher[T any] struct {
name string
mutex sync.RWMutex
buffer int
closed bool
subscribers map[Subscriber[T]]TopicFilter
}
实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:
func (p *Publisher[T]) Publish(event T) {
p.mutex.RLock()
defer p.mutex.RUnlock()
var wg sync.WaitGroup
for subscriber, filter := range p.subscribers {
subscriber := subscriber
if filter == nil || filter(event) {
wg.Add(1)
go func(event T, channel chan<- T) {
defer wg.Done()
channel <- event
}(event, subscriber)
}
}
wg.Wait()
}
这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。
type Executor interface {
Subscribe() Subscriber[WorkflowEvent]
Close()
ExecSync(ctx WorkflowContext) error
}
使用Go routine实现异步运行
前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点, 那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。
进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。 可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。 其运行原理如下面的伪代码所示:
queue <- dag.typological_sort()
while queue.is_not_empty():
ready_nodes <- queue.pop_zero_degree_nodes()
for node in ready_nodes start:
node.execute()
再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。 最终,我们执行工作流的方法逻辑如下:
func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
defer e.Close()
e.eventPublisher.Publish(NewWorkflowStartedEvent())
errGroup, ctx := errgroup.WithContext(c.GetContext())
var doneGroup sync.WaitGroup
for _, node := range e.pendingNodes {
node := node
waitChannel := e.notifyChannels[node.Id]
errGroup.Go(func() error {
select {
case <-ctx.Done():
return errCancelledByContext
case inputs := <-waitChannel:
nodeInstance, ok := node.Payload.(Node)
if !ok {
return errFailedToGetNodeInstance
}
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
})
}
e.popReadyNodes()
err := errGroup.Wait()
if err != nil {
return err
}
doneGroup.Wait()
e.eventPublisher.Publish(NewWorkflowFinishedEvent())
return nil
}
有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine, 但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类, 这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。
select {
case <-ctx.Done(): // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
// ...
return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}
注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行, 这个方法的主要作用是找到当前节点中,所有可以运行的节点。 例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束, 那么B就可以开始运行。 这时候,A的运行结果将作为B的输入信息。
func (e *AsyncExecutor) popReadyNodes() error {
defer e.lock.Unlock()
e.lock.Lock()
for _, node := range e.pendingNodes {
inputs, ready, err := e.getInputs(node.Id)
if err != nil {
return err
}
if !ready {
return nil
}
picked := e.pendingNodes[0]
e.pendingNodes = e.pendingNodes[1:] // 从尚未执行的节点队列中移除队首
notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
notifyChannel <- inputs
}
return nil
}
这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。 整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。 细心的读者容易想到一个问题,就是当工作流开始执行的时候, 总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。
执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。 其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。
这是最开始的逻辑:
// 等待所有依赖节点发送通知,
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
results := []NodeResult{}
for _, ch := range waitChannels {
result := <-ch
results = append(results, result)
}
return results
}
仔细思考其实这样实现问题挺多的:
- 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
- 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞
条件分支控制
在上述的工作流执行模型中,一个节点运行完成后会触发后继节点的运行。这在大多数情况下都有效, 但是,如果考虑到条件节点这一种特殊的节点, 就会存在问题。
如图所示,条件节点有两个分支(if/else),这两个分支有且仅能命中一个, 我们期望它的后继分支有且仅有一个能够运行。为此一种特殊的节点状态被引入:Skipped(跳过执行), 对于不需要被实际执行的节点,例如没有命中的分支,那么它的运行状态将是Skipped。
而在具体的实现上,我们为节点定义了一些属性:
- TriggerRule:触发条件,包括AllSuccess(所有依赖节点全部执行成功,不包含Skipped), NoneFailed(没有失败的情况,包含Skipped)等
- ConnectionType:连接方式,包含Selective(选择执行,如条件节点、分流)、Direct(普通情况)
- OutputValue:节点的输出值,对于条件节点,我们会输入特殊的变量来表示分支的选择结果,比如条件节点会生成$<port>=true/fase来表示某一个Port是否被命中
通过这些信息的组合,可以对节点如何执行进行有效的控制。这里的设计部分参考了Apache Airflow的做法 [5],但尽力避免不必要的复杂性。
引用和表达式解析
在工作流中,我们需要在一个节点中使用其他节点的结果。 比如,在条件节点中引用开始节点的某个变量作为判断的依据。这部分涉及到一些较为复杂的逻辑:
- 在构建工作流的时候,前端页面需要知道有哪些变量能够选择。这不仅跟节点类型有关,跟输出变量的类型、以及节点之间是否有连接也有关系。举个例子,如果两个节点之间没有连线,那么它们之间是不能相互引用变量的;条件节点中的比较操作是需要判断类型的,不能拿一个数字跟字符串去比较是否相等
- 在上一步获取可用变量列表的时候,工作流尚处于一个不完善的状态,可能是不合法的
- 变量可能涉及到复杂的嵌套结构,比如在开始节点定义一个嵌套的输入A.B.C,然后在后面的节点中引用
在Workflow的定义中,我们通过一个简单的表达式来指代引用:
"expressions": [
{
"variable": "$start1.input", // 获取开始节点(start1)中的input输出变量
"operator": "contains",
"target": {
"type": "fixed",
"value": "kingsoft"
}
}
]
这一部分的实现较为麻烦,涉及到一些递归解析算法,比如上述的$start1.input,我们首先要从节点的输出参数中找到节点start1,然后再从start1的结果中找到input变量。 但由于start1这个节点不一定是该节点的直接依赖,很可能会是依赖的依赖,这样在我们的实现中输入是一个树形结构, 需要进行递归寻址:
func resolveNodeResultRecursively(input NodeResult, nodeId string) (NodeResult, error) {
if input.NodeId == nodeId {
return input, nil
} else {
for _, parent := range input.Inputs {
result, err := resolveNodeResultRecursively(parent, nodeId)
if err == nil {
return result, nil
}
}
return NodeResult{}, errVariableNodeNotAccessable
}
}
获取到变量之后,还可能需要根据变量类型进行一些自动类型检测和转换:
func tryCastValue(v any, expectedType *TypeDescriptor) (any, error) {
if expectedType == nil {
return v, nil
}
if v == nil {
return nil, errors.New("value is nil")
}
switch *expectedType {
case StringType:
return castToString(v)
case IntegerType:
return castToInt32(v)
case FloatType:
return castToFloat32(v)
case ArrayIntegerType:
return castToInt32Array(v)
case ArrayFloatType:
return castToFloat32Array(v)
// ...
这时候,go的劣势就被发挥的淋漓尽致,比如把一个any类型转换为字符串,得这样写:
func castToString(v any) (string, error) {
if v == nil {
return "", nil
}
switch value := v.(type) {
case string:
return value, nil
case int:
return strconv.FormatInt(int64(value), 10), nil
case int32:
return strconv.FormatInt(int64(value), 10), nil
case int64:
return strconv.FormatInt(value, 10), nil
case float32:
return strconv.FormatFloat(float64(value), 'f', -1, 32), nil
case float64:
return strconv.FormatFloat(value, 'f', -1, 64), nil
case bool:
return strconv.FormatBool(value), nil
default:
return "", errVariableTypeIncompatible
}
}
测试工作流
工作流实际是一个非常复杂的系统, 很难按照传统的排列组合的方式来进行测试——有太多种组合的可能了。 那么怎样来保证代码质量呢?
使用TDD
曾经在知乎上看到一个很有意思的问题和回答:
“如何激怒一个Golang开发者?” “你写的代码像Java……”
其实在实现复杂逻辑的时候,很多方法都是可以借鉴的。尽管Go并不是一个严格意义上的面向对象的语言, 但将OO的思想用到工作流的实现中也未尝不可:如果逻辑过于复杂,那么我们就进行拆分,并遵循单一职责原则来实现。
在工作流的实现中,我们充分运用这一思想, 通过将逻辑拆分,并独立进行单元测试,来确保程序代码的正确。 举个例子, 大模型节点的实现又依赖GatewayClient、LlmRequestBuilder,其中GatewayClient用来请求网关;LlmRequestBuilder负责构建大模型请求:
type LlmNode struct {
Id string
Name string
client GatewayClient
Properties LlmNodeProperties
requestBuilder LlmRequestBuilder
}
而大模型请求的构建同样具有一定的复杂性,不仅仅是因为不同模型的参数有差异,还有一个原因是大模型节点支持一种JSON模式,允许用户直接定义大模型的输出类型:
而大模型并不都具有这样的魔法,具体的实现方式是这样的:
因此,我们进一步对这些逻辑进行拆解,通过Adaptor模式来解决不同厂商的差异:
type LlmRequestBuilderImpl struct {
schemaGenerator JsonSchemaGenerator
minimaxAdaptor MinimaxAdaptor
gptAdaptor GptAdaptor
}
对于单元测试来讲,每一个方法或者struct都足够专一,我们只需要测试其相关的逻辑。例如,在上面的例子中, 对于GptAdaptor我们有这些测试用例:
// 如果大模型节点的返回格式是JSON且模型支持JSON模式,那么需要设置json_object参数,并在提示词中描述json结构
func TestGptAdaptor_ShouldAddJsonMode_GivenOutputIsJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
ext := req.ExtendedLLMArguments["azure_gpt-35-turbo"].(map[string]any)
x := ext["response_format"].(map[string]string)
assert.Equal(t, x["type"], "json_object")
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
// 如果大模型节点的返回格式是JSON且模型不支持json_object参数,那么只需要在提示词中描述JSON格式即可
func TestGptAdaptor_ShouldAddJsonPrompt_GivenOutputIsJson_JsonModeIsNotSupported(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, "you're a helpful assistant.\n\nPlease reply exactly in the following json schema:\n```json\n"+
`{"type":"object","properties":{"age":{"type":"number","description":"user age"},"name":{"type":"string","description":"user name"}}}`+"\n```\n")
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
// 如果输出不是JSON格式,那么什么也不做
func TestGptAdaptor_ShouldDoNothing_GivenOutputIsNotJson(t *testing.T) {
// ...
err := adaptor.FixRequest(&req, p)
assert.NilError(t, err)
assert.Equal(t, req.ExtendedLLMArguments == nil, true)
assert.Equal(t, req.Context, `you're a helpful assistant.`)
assert.Equal(t, req.Messages[0].Content, "so he is a clean robot? what else do you know about him?")
}
而在上层的LlmNode的单元测试中,我们不需要关心这些独立的逻辑,采取测试替身进行Mock即可:
func TestExecuteLlmNode_ShouldReturnChoice_WhenExecute_GivenIsNotStream(t *testing.T) {
client := NewMockGatewayClient(t)
builder := NewMockLlmRequestBuilder(t)
builder.On("CreateGatewayRequest", mock.Anything, mock.Anything).Return(nil, gateway.LLMChatUniversalReq{}, nil)
node := LlmNode{
Id: "llm1",
client: client,
Properties: LlmNodeProperties{
Context: "you're helpful assistant",
Prompt: "give me a name",
Stream: false,
},
requestBuilder: builder,
}
// ...
使用mockery[8]可以为指定的interface自动生成mock对象。 注意不是字节开源的那个Mockey[9],这个框架实际上存在一些坑:
- 需要添加一些参数来禁用内联和编译优化
- 因为它全局Mock方法,可能存在多个测试执行时的互相干扰问题,比如在测试A中Mock时影响测试B的运行结果
自动化集成测试
对于一个工作流执行,如何进行集成测试呢?得益于事件驱动架构, 我们不需要关心工作流执行的Side effect,只需要关心事件即可。 也就是说,对于一个给定的工作流,它是否产生了符合预期的事件。
例如,我们有一个用例测试条件节点:
func TestExecutor_ShouldReturnLlm1Result_WhenExecute_GivenConditionMatchesLlm1(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"code":"Success","message":"成功","external_id":"a9331df1-2adb-4fad-a154-5d8767196bb2","created":1718264704,"task_id":"llm-chat-azure-gpt-35-turbo-version-0125/ac57f2f54fe1d40be193966e5875b3a8","usage":{"completion_tokens":77,"prompt_tokens":52,"total_tokens":129},"choices":[{"index":0,"finish_reason":"stop","logprobs":0,"text":"Yes"}]}`))
}))
data, _ := os.ReadFile("./schema/cases/llm-condition.json")
json, err := workflow.Parse(data)
assert.NilError(t, err)
builder := workflow.NewWorkflowBuilder(json, workflow.GatewayConfig{
BaseUrl: server.URL,
})
dag, errs := builder.ValidateAndToDAG()
assert.Equal(t, len(errs), 0)
executor, err := workflow.NewExecutor(*dag)
assert.NilError(t, err)
channel := executor.Subscribe()
go func() {
w := workflow.NewEmptyWorkflowContext(context.TODO())
w.SetParameters(map[string]any{
"input": "what's the main product of kingsoft?",
})
err := executor.ExecSync(w)
assert.NilError(t, err)
}()
events := []workflow.WorkflowEvent{}
for e := range channel {
events = append(events, e)
}
assert.Equal(t, len(events), 12)
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.NodeSkipped && e.(workflow.NodeSkippedEvent).Node == "llm2"
})
assertEventsContains(t, events, func(e workflow.WorkflowEvent) bool {
return e.Type() == workflow.JsonResponseGenerated && e.(workflow.JsonResponseEvent).Data["output"] == "Yes"
})
}
在这个集成测试中,除了网关的调用是通过Mock Server替代外,其他的调用方式都与实际一致。 在执行完成后,我们捕获事件,验证产生的事件数目以及是否包含预期的事件,来判断执行结果是否符合预期。
当然,集成测试的成本较高,我们需要维护一些典型的测试场景(工作流JSON),因此很难覆盖到逻辑的细节。 这也意味着集成测试的数目理论上应该远小于单元测试, 或者说单元测试应该更充分。
CLI测试工具
除了单元测试、集成测试等方式来测试工作流的执行之外, 很多时候,我们需要定位分析问题, 是很麻烦的。
比如,工作流A运行有问题,但是由于权限问题,排查问题的人员无法在界面上看到A; 或者由于实现的层级嵌套较深,很难判断到底是前端问题,还是后端接口问题,还是工作流执行引擎的问题。
为此,我们开发了一个CLI工具用来直接执行工作流JSON,可以比较方便地运行和查看工作流执行结果:
未来展望
工作流模型是一个高度可扩展的模型, 可以通过添加新的节点类型、插件等, 实现更复杂的功能。
一些在计划内的功能有:
- RAG:知识库检索
- 代码执行:执行一段python或者Javascript代码, 可以用来定制一些特殊的逻辑
- 图像生成:类似Coze的图像流,包含文生图以及图像处理等
- 自定义插件:允许用户接入已有的API,或者通过FaaS部署一个新的自定义插件
- 嵌套的Workflow:允许在Workflow中嵌套调用
尽管Workflow已经具备实现复杂的功能的可能,但在AI应用的层级中尚只处于第二层, 目前来看, 大概率Agent才是AI应用的尽头。
在Coze中,Workflow可以作为Agent的一种工具来执行,相对于其他的插件和工具, 我认为其最大的价值在于,通过工作流这种低代码的形式,赋予非技术人员无限的创造力。
尽管工作流很灵活,可以根据用户的需求来随意配置,但是我们不禁思考,这真的是“Agentic Workflow”么? 在上面的例子中,我们只是用到了大模型,但大模型在其中并没有辅助或者参与决策。 这个问题留给聪明的读者去思考:p
关于我们为什么要做Workflow,感兴趣可以从这里[10]了解更多的背景信息。
- ↑ 虽然模型本身可以推理,一些情况下是可以推算出结果,但无法做到准确。即使是人,也很难直接计算诸如10988x727664=?这样的问题
- ↑ https://www.deeplearning.ai/the-batch/issue-242/
- ↑ https://www.53ai.com/news/qianyanjishu/1317.html
- ↑ ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。
- ↑ https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
- ↑ https://platform.openai.com/docs/guides/json-mode
- ↑ https://platform.minimaxi.com/document/ChatCompletion%20Pro?key=66718f6ba427f0c8a57015ff#MKTRq06DJQTye5EdtIf5j3d8
- ↑ https://github.com/vektra/mockery
- ↑ https://github.com/bytedance/mockey/blob/main/README_cn.md
- ↑ AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF