Workflow design

来自WHY42

随着2023年大模型在国内的发展成熟,现在大家逐步开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。的确,看似无所不能的大模型,也并不是直接丢到一个系统就能用的,如何开发大模型应用,实际上还处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能给出一个"best practice";对于大模型应用开发来说,却仍还在探索阶段。 目前,一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个level的:

最简单的Wrapper层级,实际就是把大模型的提示词封装成一个固定功能的”方法“,然后集成到业务系统中。KPP的Function(或者Prompt),就是一个Wrapper。Wrapper能做什么完全取决于大模型的能力,比如生成一段文字,甚至写一段代码,都没有问题。但是如果想做一个复杂的功能,恐怕就不行了,比如,先生成代码再执行得到代码运行结果,这样就没法直接通过大模型请求来做了,原因是大模型本身无法直接执行代码 [1]

显然,对于复杂的功能,我们可以尝试分步骤来解决。比如一键生成PPT就是很好的例子,第一步先生成一个outline;然后再根据outline,去填充内容;最后通过一个生成文档的API生成一个美化后的PPT。这样每一步都可以用最适合的技术来进行,比如生成outline和填充内容用LLM,可以选择最合适的模型;美化PPT则可以直接使用已有的API来处理。

这其实就是Flow了,或者更明确一点,Agentic Workflow[2]。不是普通的工作流,是“智能”的工作流。通过流程的编排,我们可以扩充LLM的能力,通过一些组合来实现复杂的业务功能。相对于Wrapper而言,Workflow可以更好地实现复杂的AI能力。 因此,我们在KPP中也实现了Workflow的能力。这里就技术层面分享一下KPP在构建Workflow时的一些设计和实现。

为什么要自研,搞个开源的不行么?

实际上,在开始设计工作流之前,就已经有不少公司已经在做了。其中最具有参考价值的主要是Dify和扣子。其中,Dify是一个开源的系统,理论上可以直接拿过来用。 但是,考虑到WPS AI的实际情况,有几个比较重要的问题是Dify难以支撑的:

  • 无法直接嵌入到WPS AI的现有体系,包括网关、KPP、AI Server等,一定会存在一个二次开发的成本
  • Dify是用Python写的,如果大规模使用,很可能会遇到性能问题,包括工作流本身的异步多线程执行、以及水平扩展的性价比

实际上,字节的Coze是一个非常适合我们的开发平台。它的产品设计相当专业和完善,Workflow、图像流、Agent、多Agent协作等,是目前所有平台中定义最清晰的。相对而言,一些平台自己都不知道自己在做什么,比如百度的Agent Builder和App Builder,做着做着就成一样的了[3]。 Dify在这方面也有明显的问题,看起来还没有搞清楚Agent和Workflow的关系。

然而很遗憾目前为止没有一个开源的系统可以达到Coze的高度。 正是由于种种原因,最终KPP决定完全自研Workflow系统。

理论基础

即便从零开始实现一个Workflow其实也比较简单,因为从原理上来说,Workflow无非就是一个DAG(有向无环图)而已。 它是工作流的核心。DAG相信大家都比较熟悉,之所以要用到DAG正式因为它有两个重要的特点:

  • 节点之间的连接是有向的,否则我们将无法判断节点之间的依赖关系和执行顺序
  • 整个图是无环的,这也限制了在Workflow中是不可以存在循环的。这点非常重要,如果是一个Agent,那么我们可以一直给Agent发消息聊下去,这是一个循环;但是Workflow一定不能存在这种不确定的循环,或者说,我们期望在Workflow能够在确定有限的时间内运行完成并获取结果,它是Stateless(无状态)的

使用拓扑排序确定执行顺序

执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束?

考虑如下的几种场景:

从图中我们可以看出这也一些信息:

  • 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG
  • 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行
  • 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行

我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。

拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。

拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。

值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。

事件驱动架构

确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了:

sorted_nodes <- dag.topological_sort()
for node in sorted_nodes:
    node.wait_for_dependencies()
    node.execute()

但是我们忽略了几个重要的问题:

  • 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见
  • 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。

使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 这样对于工作流执行引擎将有非常强的扩展性和可测试性。

传递闭包简化

对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。

使用Go语言实现工作流

在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:

  • 开始、结束
  • 条件判断
  • 大模型调用
  • 插件:包括bing搜索、高召回、分流

概念模型

尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, 但实际工作流产品的设计上,一条边还可能附带一些其他的信息。

例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的[4]

当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:

注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 经过一些权衡之后,我们提出了如下的一个工作流概念模型:

这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, 因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。

这里,对于这个“点”叫什么名字好其实有一个小故事。 最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。

若合一契,岂不快哉!

定义工作流Schema

在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?

这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:

  • DAG的表示,即节点和边
  • 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同

我们使用JSON Schema定义了一个工作流格式,大概长这样:

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "nodes": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/node"
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/link"
      }
    },
    ...
  }
}

对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:

{
  ...
  "bingSearchPluginProperties": {
    "properties": {
      "pluginType": {
        "type": "string"
      },
      "version": {
        "type": "string"
      },
      "query": {
        "$ref": "#/definitions/variableBinding"
      },
      "count": {
        "type": "number"
      }
    },
    "required": [
      "pluginType",
      "version",
      "query"
    ]
  }
}

完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:

  • 这个工作流是否合法?至少格式上是否正确
  • 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?

对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!

对工作流的抽象

首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:

type Port struct {
	Direction PortDirection `json:"type"`
	Id        string        `json:"id"`
}

type Vertex struct {
	Id      string
	Type    string
	Ports   []Port
	Payload any
}

type Connection struct {
	From       string
	To         string
	SourcePort *Port
	TargetPort *Port
}

type PortawareDAG interface {
	AddVertex(node Vertex) error
	GetVertex(id string) (Vertex, error)
	Connect(fromId string, toId string) error
	ConnectFromPort(fromId string, toId string, toPort string) error
	Break(fromId string, toId string) error
	HasConnection(fromId string, toId string) (bool, error)
	GetConnection(fromId string, toId string) (Connection, error)
	GetConnections() ([]Connection, error)
	DeleteVertex(id string) error
	GetVertexes() []Vertex
	GetStartVertexes() []Vertex
	GetEndVertexes() []Vertex
	TopologicalSorting() ([]Vertex, error)
	TransitiveReduction() (PortawareDAG, error)
	GetDependencies(id string) ([]string, error)
	GetDependencyMap() (map[string][]string, error)
	GetChildren(id string) ([]string, error)
	GetChildrenMap() (map[string][]string, error)
}

而对于具体的不同的节点,是通过另外的Node接口来定义:

type Node interface {
	GetId() string
	GetName() string
	GetType() NodeType
	GetNamedPorts() []Port
	GetConnectionType() ConnectionType
	GetOutputVariables() []VariableDefination
	GetTriggerRule() TriggerRule
	Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
}

其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, 比如大模型节点的实现有单独的定义:

type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

而最终工作流的执行,是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。

type Executor interface {
	Subscribe() Subscriber[WorkflowEvent]
	Close()
	ExecSync(ctx WorkflowContext) error
}

使用Go routine 实现异步工作流运行模型

前面我们有提到,尽管拓扑排序

Pub/Sub 模式

未来展望

关于为什么要做Workflow,感兴趣可以从这里了解一些其他的信息[5]

  1. 虽然模型本身可以推理,一些情况下是可以推算出结果,但无法做到准确。即使是人,也很难直接计算诸如10988x727664=?这样的问题
  2. https://www.deeplearning.ai/the-batch/issue-242/
  3. https://www.53ai.com/news/qianyanjishu/1317.html
  4. ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。
  5. AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF