2026年7月2日 周四晚上19:30,报名腾讯会议了解“如何构建自进化的动态知识库(Brain)”(限30人)
免费POC, 零成本试错
FDE知识库

FDE知识库

学习大模型的前沿技术与行业落地应用


收藏

【Agent智能体指北】LlamaIndex 工作流:一种创建复杂 AI 应用程序的新方法

发布日期:2024-08-15 07:03:25 浏览次数: 3849
作者:AI模数师

微信搜一搜,关注“AI模数师”


LlamaIndex 推出了一个的功能:工作流,这是一种在日益复杂的 AI 应用程序中协调动作的机制。

随着大型语言模型(LLMs)的出现,一种趋势已经变成了事实上的标准:AI 应用程序由不同组件实现的多个任务组成。市场上的开源框架致力于通过提供易于使用的基础组件抽象,如数据加载器、LLMs、向量数据库和重排器,甚至外部服务,来简化 AI 工程师的工作。同时,所有这些框架也在寻找最佳的抽象方式来协调这些组件,研究对于 AI 开发者来说,实现将复合 AI 系统集成在一起的逻辑,哪种方式最直观和高效。

两种潜在的协调模式是链(Chains)和管道(Pipelines),它们都是有向无环图(DAG)抽象的实现。我们在今年年初发布的查询管道中尝试了这种方法——它是一个声明式 API,允许您针对不同用例(如问答、结构化提取和代理自动化)对数据执行简单到高级的查询工作流。但是,当我们尝试在其基础上构建并尝试添加循环以更好地支持更复杂的工作流时,我们注意到了几个问题,这让我们反思为什么 DAG 可能不适合代理景观,以及我们可以在框架中引入哪些替代方案。

图形基础 UX 的局限性

DAG 的一个基本方面是 DAG 中的“A”,它们是无环的,也就是说没有循环。但在一个越来越代理化的世界中,AI 应用程序逻辑中无法执行循环是不可接受的。例如,如果一个组件提供了不良结果,AI 开发者应该有办法告诉系统自我纠正并重试。

即使没有向 DAG 添加循环和循环,查询管道也存在一些明显的问题:

  • 当出现问题时很难调试

  • 它们隐藏了组件和模块的执行方式

  • 我们的管道协调器变得越来越复杂,必须处理大量不同的边缘情况

  • 对于复杂的管道来说,它们很难阅读

一旦我们向查询管道添加了循环,这些围绕图形的开发人员 UX 问题就被放大了。我们在诸如以下领域亲身经历了开发人员的痛苦:

  • 许多核心协调逻辑,如 if-else 语句和 while 循环,被嵌入到图形的边缘中。定义这些边缘变得繁琐且冗长。

  • 处理可选和默认值的边缘情况变得困难。对于我们这样的框架来说,很难弄清楚一个参数是否会从上游节点传递过来。

  • 定义具有循环的图形并不总是对构建代理的开发人员感觉自然。在这里,图形 UX 强制“代理”节点明确定义了传入边缘和传出边缘,迫使用户与其它节点定义冗长的通信模式。

我们问:图形真的是我们可以用来协调复合 AI 系统中组件的唯一抽象吗?

从图形到 EDA:转向事件驱动

复合 AI 系统可以通过 LlamaIndex 工作流来实现。工作流通过称为 步骤 的 Python 函数集合来回分发事件。每个步骤可以看作是系统的一个组件:一个处理查询的,一个与 LLM 交谈的,一个从向量数据库加载数据等等。每个步骤接收一个或多个事件进行处理,并可以选择发送回事件,这些事件将根据需要被中继到其他组件。

转向事件驱动架构会导致设计上的根本转变。在许多图形实现中,图形遍历算法负责确定下一个应该运行的组件以及应该传递的数据。在事件驱动架构中,组件订阅某些类型的事件,最终负责根据收到的数据决定要做什么。

在事件驱动系统中,像输入的可选性和默认值这样的概念在组件级别上得到解决,大大简化了协调代码。

工作流入门

为了澄清这个想法,让我们看一个例子。一个最小的 LlamaIndex 工作流看起来像这样:

from llama_index.core.workflow import(StartEvent,StopEvent,Workflow,step,)
from llama_index.llms.openai import OpenAI
classOpenAIGenerator(Workflow):@step()asyncdefgenerate(self, ev: StartEvent) -> StopEvent:query = ev.get("query")llm = OpenAI()response =await llm.acomplete(query)return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)result =await w.run(query="What's LlamaIndex?")print(result)

generate 函数使用 @step 装饰器标记为工作流步骤,并声明它想要接收和发送回的事件类型,使用方法签名和适当的类型注释。为了运行工作流,我们创建 OpenAIGenerator 类的实例,传递一些配置参数,如所需的超时时间,然后调用 run 方法。传递给 run 的任何关键字参数将被打包成一个特殊的 StartEvent 类型的事件,该事件将被中继到请求它的步骤(在这种情况下,只有 generate 步骤)。generate 步骤返回一个特殊的 StopEvent 类型的事件,这将向工作流发出信号,优雅地停止其执行。StopEvent 携带我们想要作为工作流结果返回给调用者的任何数据,在这种情况下是 LLM 响应。

工作流循环

在事件驱动架构中,循环与通信有关,而不是拓扑结构。任何步骤都可以通过创建和发送适当的事件多次调用另一个步骤。让我们看一个自我纠正循环的例子:

classExtractionDone(Event):output: strpassage: str
classValidationErrorEvent(Event):error: strwrong_output: strpassage: str
classReflectionWorkflow(Workflow):@step()asyncdefextract(self, ev: StartEvent | ValidationErrorEvent) -> StopEvent | ExtractionDone:ifisinstance(ev, StartEvent):passage = ev.get("passage")ifnot passage:return StopEvent(result="Please provide some text in input")reflection_prompt =""elifisinstance(ev, ValidationErrorEvent):passage = ev.passagereflection_prompt = REFLECTION_PROMPT.format(wrong_answer=ev.wrong_output, error=ev.error)
llm = Ollama(model="llama3", request_timeout=30)prompt = EXTRACTION_PROMPT.format(passage=passage, schema=CarCollection.schema_json())if reflection_prompt:prompt += reflection_prompt
output =await llm.acomplete(prompt)
return ExtractionDone(output=str(output), passage=passage)
@step()asyncdefvalidate(self, ev: ExtractionDone) -> StopEvent | ValidationErrorEvent:try:json.loads(ev.output)except Exception as e:print("Validation failed, retrying...")return ValidationErrorEvent(error=str(e), wrong_output=ev.output, passage=ev.passage)
return StopEvent(result=ev.output)
w = ReflectionWorkflow(timeout=60, verbose=True)result =await w.run(passage="There are two cars available: a Fiat Panda with 45Hp and a Honda Civic with 330Hp.")print(result)

在这个例子中,validate 步骤接收尝试的模式提取结果作为事件,并可以决定通过返回 ValidationErrorEvent 再次尝试,最终将被传递到 extract 步骤,该步骤将执行另一次尝试。请注意,在这个例子中,如果这个提取/验证循环长时间持续提供不良结果,工作流可能会超时,但另一种策略可能是在精确尝试次数后放弃,仅举一个例子。

工作流持久化

工作流在执行期间保持全局状态,并且这个状态可以根据请求共享并传播到其步骤。这个共享状态实现为 Context 对象,并且可以被步骤用来在迭代之间存储数据,也可以作为不同步骤之间的通信形式。让我们看一个更复杂的 RAG 例子的摘录,展示如何使用全局上下文:

classRAGWorkflow(Workflow):@step(pass_context=True)asyncdefingest(self, ctx: Context, ev: StartEvent) -> Optional[StopEvent]:dataset_name = ev.get("dataset")_, documents = download_llama_dataset(dsname, "./data")ctx.data["INDEX"] = VectorStoreIndex.from_documents(documents=documents)return StopEvent(result=f"Indexed {len(documents)} documents.")


在这种情况下,ingest 步骤创建了一个索引,并希望在工作流执行期间使其对可能需要它的任何其他步骤都可用。在 LlamaIndex 工作流中以符合规范的方式进行此操作的方法是声明步骤需要全局上下文的实例(@step(pass_context=True) 可以完成这个操作)并将索引存储在上下文中本身,使用其他步骤可能稍后访问的预定义键。

自定义工作流

除了工作流,我们还将发布一系列预定义的工作流,以便最常用的用例可以用一行代码实现。使用这些预定义流程,用户可能仍然想要稍微更改预定义的工作流,以引入一些自定义行为,而不必从头开始重写整个工作流。假设您想要

自定义 RAG 工作流并使用自定义重新排名步骤,您所需要做的就是子类化一个假设的内置 RAGWorkflow 类并覆盖 rerank 步骤,如下所示:

classMyWorkflow(RAGWorkflow):@step(pass_context=True)defrerank(self, ctx: Context, ev: Union[RetrieverEvent, StartEvent]) -> Optional[QueryResult]:# 我的自定义重新排名逻辑在这里
w = MyWorkflow(timeout=60, verbose=True)result =await w.run(query="Who is Paul Graham?")

调试工作流

您的工作流的复杂性将随着应用程序逻辑的复杂性而增长,有时仅通过查看 Python 代码可能很难理解事件在执行期间将如何流动。为了便于理解复杂的工作流并支持工作流执行的调试,LlamaIndex 提供了两个功能:

  • draw_all_possible_flows 生成一个图片,显示工作流中的所有步骤以及事件可能的流动方式

  • draw_most_recent_execution 生成一个类似的图片,只显示上次工作流执行期间实际发送的事件

除此之外,工作流可以手动执行,通过多次调用 run_step() 直到所有步骤都完成。每次 run_step 调用后,可以检查工作流,检查任何中间结果或调试日志。

开启工作流之旅

尽管处于开发的早期阶段,LlamaIndex 工作流已经代表了与查询管道相比的向前迈出的一步,扩展了它们的功能并增加了更多的灵活性。除此之外,工作流带有一套您通常会期望从更成熟的软件中获得的功能:

  • 完全异步,支持流式传输

  • 默认情况下进行了仪器化,提供一键可观察性与支持的集成

  • 逐步执行,便于调试

  • 事件驱动依赖的验证和可视化

  • 事件实现为 pydantic 模型,以简化自定义和进一步开发新功能

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询

扫码登录
登录即表示您同意《53AI网站服务协议》
服务协议

欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。

在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。

一、 定义

本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。

会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。

知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。

二、 账号注册与登录

登录方式:本网站支持以下登录方式,您可根据实际情况选择:

微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。

手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。

账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。

实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。

未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。

三、 服务内容与规范

知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。

服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。

禁止行为:您在使用服务时不得实施以下行为:

利用技术手段批量爬取、下载、转存知识库内容;

将知识库内容用于商业目的或未经授权地向第三方传播;

干扰本网站正常运行或侵犯其他用户合法权益;

发布违法违规信息或从事违反公序良俗的活动。

四、 知识产权声明

权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。

有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。

侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。

五、 个人信息保护

我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。

您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。

您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。

六、 免责声明

内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。

不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。

第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。

七、 违约责任

如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。

如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。

八、 法律适用与争议解决

本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。

因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。

九、 其他

本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。

本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。

我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。


已查阅