Workflow design

来自WHY42

随着2023年大模型在国内的发展成熟,现在大家逐步开始真正关注到大模型应用的开发上。有人说,2024年是大模型应用的落地元年。的确,看似无所不能的大模型,也并不是直接丢到一个系统就能用的,如何开发大模型应用,实际上还处于一个刚起步的阶段。一些成熟的技术,经历过许多人摸索后,总能给出一个"best practice";对于大模型应用开发来说,却仍还在探索阶段。 目前,一个看起来比较靠谱的模型,是这样定义大模型应用开发的几个level的:

最简单的Wrapper层级,实际就是把大模型的提示词封装成一个固定功能的”方法“,然后集成到业务系统中。KPP的Function(或者Prompt),就是一个Wrapper。Wrapper能做什么完全取决于大模型的能力,比如生成一段文字,甚至写一段代码,都没有问题。但是如果想做一个复杂的功能,恐怕就不行了,比如,先生成代码再执行得到代码运行结果,这样就没法直接通过大模型请求来做了,原因是大模型本身无法直接执行代码 [1]

显然,对于复杂的功能,我们可以尝试分步骤来解决。比如一键生成PPT就是很好的例子,第一步先生成一个outline;然后再根据outline,去填充内容;最后通过一个生成文档的API生成一个美化后的PPT。这样每一步都可以用最适合的技术来进行,比如生成outline和填充内容用LLM,可以选择最合适的模型;美化PPT则可以直接使用已有的API来处理。

这其实就是Flow了,或者更明确一点,Agentic Workflow[2]。不是普通的工作流,是“智能”的工作流。通过流程的编排,我们可以扩充LLM的能力,通过一些组合来实现复杂的业务功能。相对于Wrapper而言,Workflow可以更好地实现复杂的AI能力。 因此,我们在KPP中也实现了Workflow的能力。这里就技术层面分享一下KPP在构建Workflow时的一些设计和实现。

为什么要自研,搞个开源的不行么?

实际上,在开始设计工作流之前,就已经有不少公司已经在做了。其中最具有参考价值的主要是Dify和扣子。其中,Dify是一个开源的系统,理论上可以直接拿过来用。 但是,考虑到WPS AI的实际情况,有几个比较重要的问题是Dify难以支撑的:

  • 无法直接嵌入到WPS AI的现有体系,包括网关、KPP、AI Server等,一定会存在一个二次开发的成本
  • Dify是用Python写的,如果大规模使用,很可能会遇到性能问题,包括工作流本身的异步多线程执行、以及水平扩展的性价比

实际上,字节的Coze是一个非常适合我们的开发平台。它的产品设计相当专业和完善,Workflow、图像流、Agent、多Agent协作等,是目前所有平台中定义最清晰的。相对而言,一些平台自己都不知道自己在做什么,比如百度的Agent Builder和App Builder,做着做着就成一样的了[3]。 Dify在这方面也有明显的问题,看起来还没有搞清楚Agent和Workflow的关系。

然而很遗憾目前为止没有一个开源的系统可以达到Coze的高度。 正是由于种种原因,最终KPP决定完全自研Workflow系统。

理论基础

即便从零开始实现一个Workflow其实也比较简单,因为从原理上来说,Workflow无非就是一个DAG(有向无环图)而已。 它是工作流的核心。DAG相信大家都比较熟悉,之所以要用到DAG正式因为它有两个重要的特点:

  • 节点之间的连接是有向的,否则我们将无法判断节点之间的依赖关系和执行顺序
  • 整个图是无环的,这也限制了在Workflow中是不可以存在循环的。这点非常重要,如果是一个Agent,那么我们可以一直给Agent发消息聊下去,这是一个循环;但是Workflow一定不能存在这种不确定的循环,或者说,我们期望在Workflow能够在确定有限的时间内运行完成并获取结果,它是Stateless(无状态)的

使用拓扑排序确定执行顺序

执行DAG第一件事情就是确定节点的执行顺序,从哪里开始,又如何结束?

考虑如下的几种场景:

从图中我们可以看出这也一些信息:

  • 严格而言,理论上DAG只要求有向无环,但是不必要“连通”的。也即是说,图中可以存在孤立的点,或者多个分散的DAG构成一个大的DAG
  • 一个节点可能有多个后继节点,意味着某个任务完成后,可能触发多个任务的执行
  • 一个节点也可能有多个依赖,那么只有依赖节点全部执行完成后,才能开始它的执行

我们可以通过拓扑排序(Topological sorting)来确定节点的执行顺序。比较著名的算法是Kahn's algorithm,原理也很简单,从图中选出入度为零的节点,然后删掉这些节点和边,再重复这一过程直到原图为空。这样选出的这些节点就是拓扑排序的结果。

拓扑排序还有一个附带的好处,就是可以检测图中是否有环。如果图中没有入度为零的节点,但是选出的节点数目仍然小于图的节点数,那么拓扑排序失败,证明图中有环存在。

拓扑排序的结果并不一定是唯一的。比如场景一中,由于节点相互之间完全无依赖,因此任意一个组合都是正确的拓扑排序结果。

值得注意的是,尽管拓扑排序得到的是一个有序的列表,这并不意味着你需要“串行”执行。场景三的拓扑排序结果为[A,B,C,D]或者[A,B,D,C],其中,C和D是可以并行执行的。

事件驱动架构

确定了节点的执行顺序之后,我们可以按照顺序来执行节点,看起来工作流的执行就这样完成了:

sorted_nodes <- dag.topological_sort()
for node in sorted_nodes:
    node.wait_for_dependencies()
    node.execute()

但是我们忽略了几个重要的问题:

  • 节点之间是有交互的。考虑场景三中,节点B是可能需要使用A的运行结果的,也就是说,一个节点的运行结果应该对其后继节点可见
  • 我们需要能够实时观测工作流的运行情况,比如当前执行到哪一个节点了?这对于界面调试、API调用等都十分有用。

使用事件驱动架构可以比较容易来解决这个问题。我们将工作流的运行抽象成一系列的事件,节点的执行产生事件(比如节点开始、节点结束),而这些事件具体如何使用则由消费者来决定。 这样,在UI上调试工作流时,我们只需要根据这些事件来更新页面上的节点状态;当使用API调用时,我们则可以过滤掉不感兴趣的事件,只监听结果事件并返回给调用方即可。 这样对于工作流执行引擎将有非常强的扩展性和可测试性。

传递闭包简化

对于使用DAG的系统,一个不可忽视的问题就是DAG的不确定性:用户很可能构建出奇奇怪怪的巨复杂的图出来,其中可能会有不必要的边。 这对于理解和执行工作流都可能造成麻烦。例如,下图中左边a→d, a→e 都是不必要的边。通过传递闭包简化(Transitive reduction)算法可以对无用的连接进行删除,从而得到一个唯一确定的简化工作流。

使用Go语言实现工作流

在有以上的一些理论基础后,实现思路已经非常清晰,尽管仍然有许多技术细节需要确定,就看如何具体设计和实现了。我们使用Go语言来实现,初步支持下面几种节点:

  • 开始、结束
  • 条件判断
  • 大模型调用
  • 插件:包括bing搜索、高召回、分流

概念模型

尽管前面我们认为工作流是一个DAG,但并不是直接搞一个DAG就能实现向Coze这样的工作流系统,这里面最大的问题在于,DAG中边只包含了方向, 但实际工作流产品的设计上,一条边还可能附带一些其他的信息。

例如,ComfyUI中一个节点到另一个节点连接是区分不同的点的[4]

当然它的业务场景跟我们差距比较大。但是,考虑到条件节点的表示,我们发现也存在类似的情况:

注意其中的条件节点,不同的分支连接到下游节点时,起点是标记到分支上的,而不是单纯的两个节点连接。当然我们可以不采用这种方式,可以在节点属性中设置一些字段来表示什么情况下执行,甚至设置到边的信息中。但不可否认的是,这种边直接连接到条件上的方式是最直观的。 经过一些权衡之后,我们提出了如下的一个工作流概念模型:

这个概念模型,通过对边添加属性来区分是从哪个点连接到哪个点。但并未改变DAG的本质, 因此在KPP的工作流中,是无法直接从一个条件节点的两个分支连接到同一个节点的,因为这样相当于生成了两条边。

这里,对于这个“点”叫什么名字好其实有一个小故事。 最开始,我们从Qt的Singal/Slot概念中借用“Slot”的概念叫“槽位”,后来觉得不妥,借用IP端口的概念叫Port。 彼时,我们抓包过Coze的协议,想参考它的定义来设计,但结果发现它表示的也乱七八糟,根本看不懂。 过了很久,等我再次抓包Coze的时候,惊讶的发现他也改名叫Port了。

若合一契,岂不快哉!

定义工作流Schema

在实现之前,首先需要解决的一个问题是,我们如何来表示一个工作流?比如条件判断的条件怎么存储?大模型调用的提示词怎么表示?

这是一个非常重要的问题,因为它是一切的基础,我们需要通过一个标准的协议来统一前端、后端等对于工作流的表示。不难发现,工作流的定义包括:

  • DAG的表示,即节点和边
  • 节点的自身属性,根据节点类型不同,其可用的属性也不尽相同

我们使用JSON Schema定义了一个工作流格式,大概长这样:

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "nodes": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/node"
      }
    },
    "edges": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/link"
      }
    },
    ...
  }
}

对于不同的节点,我们可以分别定义节点具体有哪些属性,格式是什么,是否必须等,例如:

{
  ...
  "bingSearchPluginProperties": {
    "properties": {
      "pluginType": {
        "type": "string"
      },
      "version": {
        "type": "string"
      },
      "query": {
        "$ref": "#/definitions/variableBinding"
      },
      "count": {
        "type": "number"
      }
    },
    "required": [
      "pluginType",
      "version",
      "query"
    ]
  }
}

完整的定义比较复杂,但通过这样一个Schema,我们可以解决很多问题:

  • 这个工作流是否合法?至少格式上是否正确
  • 前端开发时,如何清楚工作流支持多少个节点?每个节点属性是什么?

对于这种复杂的工作流来说,传统的API文档是无法精确到这样的程度的,不仅如此,它的最大好处是,它是真的可以运行的!

工作流的构建

对工作流的抽象定义

首先我们定义了一个业务无关的单纯DAG实现,仅做了必要的修改来适配概念模型中的槽位:

type Port struct {
	Direction PortDirection `json:"type"`
	Id        string        `json:"id"`
}

type Vertex struct {
	Id      string
	Type    string
	Ports   []Port
	Payload any
}

type Connection struct {
	From       string
	To         string
	SourcePort *Port
	TargetPort *Port
}

type PortawareDAG interface {
	AddVertex(node Vertex) error
	GetVertex(id string) (Vertex, error)
	Connect(fromId string, toId string) error
	ConnectFromPort(fromId string, toId string, toPort string) error
	TopologicalSorting() ([]Vertex, error)
	TransitiveReduction() (PortawareDAG, error)
    // ...
}

而对于具体的不同的节点,是通过另外的Node接口来定义:

type Node interface {
	GetId() string
	GetType() NodeType
	Execute(ctx context.Context, options NodeExecuteOptions, doneGroup *sync.WaitGroup) ([]InputValue, []OutputValue, error)
    // ...
}

其中,最重要的方法就是Execute,它将设计执行节点,比如请求大模型,或者执行条件判断。这样,不同类型的Node会有不同类型的实现, 比如大模型节点的实现有单独的定义:

type LlmNode struct {
	Id             string
	Name           string
	client         GatewayClient
	Properties     LlmNodeProperties
	requestBuilder LlmRequestBuilder
}

解析动态类型

由于工作流节点的类型不一样,我们在实例化成Node类型的时候,一个问题是需要进行动态类型的转换。 比如,我们JSON中大概是这样定义的:

{
    "nodes": [ 
        { "type": "start", ...},
        { "type": "llm", ...}
    ]
}

而我们需要根据type实例化成不同的类型(StartNode,LlmNode)。不得不吐槽,在go里面没有优雅的实现方式, 不得不将JSON反序列化成any(实际上是一个map),然后再部分序列化成JSON,然后再根据类型再次反序列化成实际的类型:

for index, node := range w.Nodes {
	jsonData, _ := json.Marshal(node.Properties)
	var p any
	var err error
	t := node.Type
	if t == TypeStart {
		p, err = loadStartNode(jsonData)
	} else if t == TypeLlm {
		p, err = loadLlmNode(jsonData)
	} else if t == TypeEnd {
		p, err = loadHttpNode(jsonData)
	}
	// ...

异步工作流运行模型

Pub/Sub模式

在事件处理架构中,我们通过事件来接偶工作流执行引擎,以及其后面的消费者。 Go语言天然提供了channel来进行消息通信,但不幸的是,它是一个单生产者/消费者的模型。 在我们需求中,我们应该允许多个消费者存在,彼此只关心自己感兴趣的事件。比如,我们可以有一个消费者来打印日志;另一个消费者来返回HTTP消息。

这是一个Pub/Sub模式。尽管Go原生不支持,但是我们很容易实现:

type Subscriber[T any] chan T
type TopicFilter func(event any) bool

type Publisher[T any] struct {
	name        string
	mutex       sync.RWMutex
	buffer      int
	closed      bool
	subscribers map[Subscriber[T]]TopicFilter
}

实现原理也很简单,那就是为每一个消费者创建一个channel,然后发送消息的时候,发送多次:

func (p *Publisher[T]) Publish(event T) {
	p.mutex.RLock()
	defer p.mutex.RUnlock()

	var wg sync.WaitGroup
	for subscriber, filter := range p.subscribers {
		subscriber := subscriber
		if filter == nil || filter(event) {
			wg.Add(1)
			go func(event T, channel chan<- T) {
				defer wg.Done()

				channel <- event
			}(event, subscriber)
		}
	}
	wg.Wait()
}

这样我们就有了一个Pub/Sub实现,我们用它来作为工作流执行的事件订阅/消费系统。核心是一个Executor接口,他提供一个订阅的方法用来订阅事件,以及一个执行的方法来执行工作流。

type Executor interface {
	Subscribe() Subscriber[WorkflowEvent]
	Close()
	ExecSync(ctx WorkflowContext) error
}

使用Go routine实现异步运行

前面我们有提到,尽管拓扑排序可以将工作流节点排序成一个有序的列表,但并不代表节点需要串行执行。比如说,如果有多个入度为0的节点, 那么我们可以同时运行它们。在go语言中,很容易通过go routine来实现。

进一步思考,我们会发现,不仅需要做拓扑排序,还需要根据节点的依赖来执行节点。只有当节点的依赖项全部执行完成后,才能开始该节点的执行。 可以通过一个队列来实现,不断执行和删除就绪的节点,直到全部节点执行完成。 其运行原理如下面的伪代码所示:

queue <- dag.typological_sort()
while queue.is_not_empty():
    ready_nodes <- queue.pop_zero_degree_nodes()
        for node in ready_nodes start:
            node.execute()

再进一步思考,如果某一个节点运行失败,我们应该如何处理?一个比较合理的做法是直接停止工作流的运行。这种逻辑可以通过errgroup来实现。 最终,我们执行工作流的方法逻辑如下:

func (e *AsyncExecutor) ExecSync(c WorkflowContext) error {
	defer e.Close()
	e.eventPublisher.Publish(NewWorkflowStartedEvent())
	errGroup, ctx := errgroup.WithContext(c.GetContext())
	var doneGroup sync.WaitGroup
	for _, node := range e.pendingNodes {
		node := node
		waitChannel := e.notifyChannels[node.Id]
		errGroup.Go(func() error {
			select {
			case <-ctx.Done():
				return errCancelledByContext
			case inputs := <-waitChannel:
				nodeInstance, ok := node.Payload.(Node)
				if !ok {
					return errFailedToGetNodeInstance
				}
				return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
			}
		})
	}

	e.popReadyNodes()

	err := errGroup.Wait()
	if err != nil {
		return err
	}
	doneGroup.Wait()
	e.eventPublisher.Publish(NewWorkflowFinishedEvent())
	return nil
}

有意思的是,我们并不是等节点的依赖项执行完成之后,再启动一个go routine,而是一开始就为每一个节点启动了一个go routine, 但是通过一个信号来控制何时开始运行。我们可以通过context.Context来进行额外的控制,比如设定超时时间之类, 这对于避免程序出现异常情况导致工作流无休止运行非常有帮助。

select {
case <-ctx.Done():            // 如果超时或者取消,go routine会从这里直接返回,从而中断执行
	return errCancelledByContext
case inputs := <-waitChannel: // 控制节点开始执行的信号channel
	// ... 
	return e.Execute(ctx, nodeInstance, inputs, &doneGroup)
}

注意在上面的流程中我们通过e.popReadyNodes()这个方法来通知已经就绪的节点开始运行, 这个方法的主要作用是找到当前节点中,所有可以运行的节点。 例如,考虑一个包含A->B->C三个节点的工作流,如果A已经执行结束, 那么B就可以开始运行。 这时候,A的运行结果将作为B的输入信息。

func (e *AsyncExecutor) popReadyNodes() error {
	defer e.lock.Unlock()
	e.lock.Lock()

	for _, node := range e.pendingNodes {
		inputs, ready, err := e.getInputs(node.Id)
		if err != nil {
			return err
		}
		if !ready {
			return nil
		}
		picked := e.pendingNodes[0]
		e.pendingNodes = e.pendingNodes[1:]          // 从尚未执行的节点队列中移除队首
		notifyChannel := e.notifyChannels[picked.Id] // 通知移除的节点开始执行
		notifyChannel <- inputs
	}
	return nil
}

这里每一个节点对应一个通知channel,在准备运行的阶段就已经进行了初始化。 整体思路是,每当有节点执行完成,那么就检查是否有新的节点可以开始运行,否则,不可能执行其他的节点,因为剩余的节点都是有依赖关系的。 细心的读者容易想到一个问题,就是当工作流开始执行的时候, 总是有起始节点是没有依赖的,按照这个逻辑怎么触发运行呢?这个问题很容易解决,那就是工作流开始的时候,手动调用一次popReadyNodes。

执行的关键在于,一个节点运行的结束,会触发其后继节点的运行。这个“触发”怎么实现,实际上是有很多种办法的。 其实第一版实现并不是这样为每一个节点设置一个通知channel(全局唯一,每一个节点有且仅有一个通知channel);而是通过Pub/Sub模式来等待所有依赖项完成并通知。

这是最开始的逻辑:

// 等待所有依赖节点发送通知
func blockForNodeResults(waitChannels []<-chan NodeResult) []NodeResult {
	results := []NodeResult{}
	for _, ch := range waitChannels {
		result := <-ch
		results = append(results, result)
	}
	return results
}

仔细思考其实这样实现问题挺多的:

  • 需要为每一个依赖创建一个channel来发送和接收通知,且运行的时候还需要等待所有channel都接收到结果,相当于把“我是否能够执行”的逻辑交给节点自己去判断
  • 由于channel发送和接收需要对应,需要至少为channel设置一个大小为1的buffer,否则接受channel的动作有可能导致节点运行阻塞

Pub/Sub 模式

未来展望

关于为什么要做Workflow,感兴趣可以从这里了解一些其他的信息[5]

  1. 虽然模型本身可以推理,一些情况下是可以推算出结果,但无法做到准确。即使是人,也很难直接计算诸如10988x727664=?这样的问题
  2. https://www.deeplearning.ai/the-batch/issue-242/
  3. https://www.53ai.com/news/qianyanjishu/1317.html
  4. ComfyUI是一个用来配置Stable diffusion图像生成工作流的系统。尽管它一个节点到另一个节点可以有多条边,这看起来不像是一个DAG,但从实现原理上来说,这仍然可以通过DAG来表示。
  5. AI Con总结和KPP工作流 方案汇报 https://365.kdocs.cn/l/crXXklzrerKF