2026年7月9日 周四晚上19:30,报名腾讯会议了解“如何构建自进化的动态知识库(Brain)”(限30人)
免费POC, 零成本试错
FDE知识库

FDE知识库

学习大模型的前沿技术与行业落地应用


收藏

如何从网关层降低 AI 的调用成本

发布日期:2025-02-15 20:41:44 浏览次数: 2556
作者:Higress

微信搜一搜,关注“Higress”

推荐语

降低AI调用成本的实用策略,由AI领域博士生分享。

核心内容:
1. WebAssembly插件在网关层的应用
2. AI请求缓存与语义级别缓存命中逻辑设计
3. 网关环境搭建与本地测试方法

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家


《Higress  AI 网关挑战赛》正在火热进行中,Higress 社区邀请了目前位于排行榜 top5 的选手杨贝宁同学分享他的心得。下面是他整理的参赛攻略:

背景


我们要在 Higress 网关中编写 WebAssembly(wasm)插件,使得在 http 请求的各个阶段(requestHeader,requestBody,responseHeader,responseBody)能够将相应的请求或返回捕获进行业务逻辑的处理。具体到本比赛,主要需要实现的是缓存对大模型的请求(openai 接口的形式)在本地(或云数据库),并设计语义级别的缓存命中逻辑来实现降低响应请求且减少 token 费用的目的。


AI Cache 示例

以上图为例,本比赛主要的问题可以归纳为:(1)如何根据 Query 字符串生成合适的 Query 向量 ⇒ 向量生成器选型。(2)如何根据 Query 向量进行语义级别的查找,快速找到合适的缓存向量 ⇒ 缓存命中逻辑设计。(3)如何管理大量的缓存⇒向量数据库选型及重复初始化逻辑。

实际上 Redis 也具备 Vector Store 能力,这里的 Cache Store 和 Vector Store 是可以合并的。不过本 Demo 将二者分开了,Cache Store 使用 Redis,Vector Store 使用阿里云 DashVector 服务。

一、网关环境搭建

首先我们需要在线上搭建网关环境以供测试和评测使用,本文也提供了本地搭建网关环境的方法供读者参考。注意这里的线上搭建环境是以赛题介绍为基础展开的,若读者已经搭建好线上的开源网关或企业网关可跳过本节。


1.1 搭建线上企业版 Higress 环境


赛题支持开源 Higress 和企业版 Higress 两种不同的配置,本文以企业版 Higress 为例进行展示。

1.1.1 申请免费试用并创建相应资源

1.1.2 创建服务 (通义千问的地址、Redis 服务、Dashscope 服务)

1.1.3 创建路由转发给通义千问

1.1.4 开启 Higress 插件市场中的 AI Proxy 插件

网关需要 AI Proxy 插件作为处理 AI 请求的支撑,我们可以采用插件市场中已有的 ai-proxy 插件。从源码编译的命令和上次如下所示。最后,配置环节需要提供大模型服务商的 api 和 token key 等,注意比赛需要使用通义千问的 qwen_long 模型。

1.1.5 编译上传 AI Cache 插件* (注意可能需要修改 “ai-cache” 名称防止和插件市场已有插件重复)

本比赛的核心在于自定义 AI Cache 插件以实现更鲁棒的缓存逻辑。首先还是以 Higress 源码提供的基础代码为例进行编译和上传,配置环节需要提供 redis 服务的名称。注意此步骤会在后续迭代代码中反复使用。
git clone https://github.com/alibaba/higress.gitcd higress/plugins/wasm-goPLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100  make build
1.2 本地测试环境搭建和代码更新逻辑
由于线上环境的测试的成本较高,我们也可以采用 Higress + LobeChat 快速搭建私人 GPT 助理[1]的方式起两个 Docker 容器进行本地测试,参考 dockerfile 如下:
version: '3.9'
networks:higress-net:external: false
services:higress:image: registry.cn-hangzhou.aliyuncs.com/ztygw/aio-redis:1.4.1-rc.1environment:- GATEWAY_COMPONENT_LOG_LEVEL=misc:error,wasm:debug # 重要,开启日志- CONFIG_TEMPLATE=ai-proxy- DEFAULT_AI_SERVICE=qwen- DASHSCOPE_API_KEY= [YOUR_KEY]networks:- higress-netports:- "9080:8080/tcp"- "9001:8001/tcp"volumes:- 本地data目录:/data- 本地log目录:/var/log/higress/ # 重要,方便在容器restrat之后查看日志restart: alwayslobechat:image: lobehub/lobe-chatenvironment:- CODE=123456ed- OPENAI_API_KEY=unused- OPENAI_PROXY_URL=http://higress:8080/v1networks:- higress-netports:- "3210:3210/tcp"    restart: always

主要更改了 Higress 的 image,environment 以及 volumes 的配置,启动和重启就是 docker compose up -d docker compose restart。

进一步地,我们需要了解本地代码编写的逻辑如何能反馈到测试环境中。和线上网关环境直接上传编译后的二进制 wasm 插件不同的是,这里需要采用的是:本地编写代码 ⇒ 本地编译 wasm 插件 ⇒ Docker 打包镜像并上传 ⇒ 修改本地测试环境配置中的镜像版本 ⇒ 开始测试并打印日志的流程。具体参考代码如下:
cd ${workspaceFolder}/higress/plugins/wasm-goPLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100 make build // 修改版本号(version.txt)export cur_version=$(cat ${workspaceFolder}/version.txt) && docker build -t [YOUR IMAGE_BASE_URL]:$cur_version -f Dockerfile . && docker push [YOUR_IMAGE_BASE_URL]:$cur_version// 修改本地测试环境配置中的镜像版本sudo bash -c \"sed -i 's|oci://registry.cn-hangzhou.aliyuncs.com/XXX:[0-9]*\\\\.[0-9]*\\\\.[0-9]*|oci://registry.cn-hangzhou.aliyuncs.com/XXX:$(cat version.txt)|g' data/wasmplugins/ai-cache-1.0.0.yaml\

二、文本向量请求逻辑及缓存命中逻辑编写

在本节中,我们将通过一个简单的示例来说明如何在网关中编写请求外部服务的缓存逻辑。

当查询到达时,与 Redis 中存储的键进行匹配(`redisSearchHandler`)。如果完全一致,则直接返回结果(`handleCacheHit`)。如果不匹配,则请求 `text_embedding` 接口将查询转换为 `query_embedding`(`fetchAndProcessEmbeddings`)。使用 `query_embedding` 与向量数据库中的向量进行 ANN 搜索,返回最接近的键,并通过阈值进行过滤(`performQueryAndRespond`)。如果返回结果为空或距离大于阈值,则丢弃结果,本轮缓存未命中,最后将 `query_embedding` 存入向量数据库(`uploadQueryEmbedding`)。如果距离小于阈值,则再次调用 Redis 对最相似的键进行匹配(`redisSearchHandler`)。在响应阶段,请求 Redis 新增键值对,键为查询的问题,值为LLM 返回结果。

可以看到,除了 Redis 服务外,我们还需要请求文本向量化服务和向量数据库服务,这里我们分别选取向量生成器:阿里灵积通用文本向量接口[2]和向量数据库:阿里向量检索服务 DashVector[3]作为服务商。

注意:由于 wasm 插件不支持协程等特性,调用外部服务需遵循:如何在插件中请求外部服务[4]
2.1 外部服务声明和注册
为了实现思路 1-5 的连续外部服务调用。在 Higress 相关的配置上,我们首先需要声明外部服务:
DashVectorClientwrapper.HttpClient `yaml:"-" json:"-"`DashScopeClient wrapper.HttpClient `yaml:"-" json:"-"`redisClient    wrapper.RedisClient `yaml:"-" json:"-"`

并且在 ParseConfig 函数中注册外部服务:

c.DashVectorInfo.DashVectorClient = wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName: c.DashVectorInfo.DashVectorServiceName,Port:443,Domain:c.DashVectorInfo.DashVectorAuthApiEnd,})c.DashVectorInfo.DashScopeClient = wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName: c.DashVectorInfo.DashScopeServiceName,Port:443,Domain:"dashscope.aliyuncs.com",})
这里的 ParseConfig 函数是在 http 请求的各个阶段回调函数(requestHeader,requestBody,responseHeader,responseBody)之前的注册函数。
2.2 AI Cache 配置文件
在增加了上述外部服务的基础上,对应的 AI Cache 的配置文件也需要进行修改,此处对应 1.1.5 节的配置。示例配置如下:
Dash:dashScopeKey: "YOUR_DASHSCOPE_KEY" # 这个是文本向量的keydashScopeServiceName: "qwen" # 重要,需要和scope对应的服务名匹配dashVectorCollection: "YOUR_CLUSTER_NAME"dashVectorEnd: "YOUR_VECTOR_END" dashVectorKey: "YOUR_DASHVECTOR_KEY" # 这个是DASHVECTOR的keydashVectorServiceName: "DashVector.dns" # 重要,需要新建一个vector对应的DNS服务 sessionID: "XXX" # 可用可不用,主要用于重复初始化逻辑redis: # 重要serviceName: "redis.static"timeout: 2000

2.3 连续 callback 实现连续服务调用
基于上述思路,实现的核心代码如下。其中的核心难点仍在于如何实现服务间的连续调用问题,以 onHttpRequestBody 函数为例,代码需要实现并发逻辑,而不是简单的顺序逻辑。因此,主函数代码必须返回 types.Action,即声明是阻塞还是继续执行。当前逻辑要求在处理完缓存命中逻辑后才能继续操作,因此主函数需要返回 types.Pause。最后,根据我们的处理逻辑,在调用外部服务的回调函数中,根据是否命中缓存执行 proxywasm.ResumeHttpRequest() 或直接返回 proxywasm.SendHttpResponse()。
// ===================== 以下是主要逻辑 =====================// 主handler函数,根据key从redis中获取value ,如果不命中,则首先调用文本向量化接口向量化query,然后调用向量搜索接口搜索最相似的出现过的key,最后再次调用redis获取结果// 可以把所有handler单独提取为文件,这里为了方便读者复制就和主逻辑放在一个文件中了// // 1. query 进来和 redis 中存的 key 匹配 (redisSearchHandler) ,若完全一致则直接返回 (handleCacheHit)// 2. 否则请求 text_embdding 接口将 query 转换为 query_embedding (fetchAndProcessEmbeddings)// 3. 用 query_embedding 和向量数据库中的向量做 ANN search,返回最接近的 key ,并用阈值过滤 (performQueryAndRespond)// 4. 若返回结果为空或大于阈值,舍去,本轮 cache 未命中, 最后将 query_embedding 存入向量数据库 (uploadQueryEmbedding)// 5. 若小于阈值,则再次调用 redis对 most similar key 做匹配。(redisSearchHandler)// 7. 在 response 阶段请求 redis 新增key/LLM返回结果
func redisSearchHandler(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool, ifUseEmbedding bool) error {err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) {if err := response.Error(); err == nil && !response.IsNull() {log.Warnf("cache hit, key:%s", key)handleCacheHit(key, response, stream, ctx, config, log)} else {log.Warnf("cache miss, key:%s", key)if ifUseEmbedding {handleCacheMiss(key, err, response, ctx, config, log, key, stream)} else {proxywasm.ResumeHttpRequest()return}}})return err}
// 简单处理缓存命中的情况, 从redis中获取到value后,直接返回func handleCacheHit(key string, response resp.Value, stream bool, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) {log.Warnf("cache hit, key:%s", key)ctx.SetContext(CacheKeyContextKey, nil)if !stream {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1)} else {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1)}}
// 处理缓存未命中的情况,调用fetchAndProcessEmbeddings函数向量化queryfunc handleCacheMiss(key string, err error, response resp.Value, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {if err != nil {log.Warnf("redis get key:%s failed, err:%v", key, err)}if response.IsNull() {log.Warnf("cache miss, key:%s", key)}fetchAndProcessEmbeddings(key, ctx, config, log, queryString, stream)}
// 调用文本向量化接口向量化query, 向量化成功后调用processFetchedEmbeddings函数处理向量化结果func fetchAndProcessEmbeddings(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {Emb_url, Emb_requestBody, Emb_headers := ConstructTextEmbeddingParameters(&config, log, []string{queryString})config.DashVectorInfo.DashScopeClient.Post(Emb_url,Emb_headers,Emb_requestBody,func(statusCode int, responseHeaders http.Header, responseBody []byte) {// log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))log.Infof("Successfully fetched embeddings for key: %s", key)if statusCode != 200 {log.Errorf("Failed to fetch embeddings, statusCode: %d, responseBody: %s", statusCode, string(responseBody))ctx.SetContext(QueryEmbeddingKey, nil)proxywasm.ResumeHttpRequest()} else {processFetchedEmbeddings(key, responseBody, ctx, config, log, stream)}},10000)}
// 先将向量化的结果存入上下文ctx变量,其次发起向量搜索请求func processFetchedEmbeddings(key string, responseBody []byte, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {text_embedding_raw, _ := ParseTextEmbedding(responseBody)text_embedding := text_embedding_raw.Output.Embeddings[0].Embedding// ctx.SetContext(CacheKeyContextKey, text_embedding)ctx.SetContext(QueryEmbeddingKey, text_embedding)ctx.SetContext(CacheKeyContextKey, key)performQueryAndRespond(key, text_embedding, ctx, config, log, stream)}
// 调用向量搜索接口搜索最相似的key,搜索成功后调用redisSearchHandler函数获取最相似的key的结果func performQueryAndRespond(key string, text_embedding []float64, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {vector_url, vector_request, vector_headers, err := ConstructEmbeddingQueryParameters(config, text_embedding)if err != nil {log.Errorf("Failed to perform query, err: %v", err)proxywasm.ResumeHttpRequest()return}config.DashVectorInfo.DashVectorClient.Post(vector_url,vector_headers,vector_request,func(statusCode int, responseHeaders http.Header, responseBody []byte) {log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))query_resp, err_query := ParseQueryResponse(responseBody)if err_query != nil {log.Errorf("Failed to parse response: %v", err)proxywasm.ResumeHttpRequest()return}if len(query_resp.Output) < 1 {log.Warnf("query response is empty")uploadQueryEmbedding(ctx, config, log, key, text_embedding)return}most_similar_key := query_resp.Output[0].Fields["query"].(string)log.Infof("most similar key:%s", most_similar_key)most_similar_score := query_resp.Output[0].Scoreif most_similar_score < 0.1 {ctx.SetContext(CacheKeyContextKey, nil)redisSearchHandler(most_similar_key, ctx, config, log, stream, false)} else {log.Infof("the most similar key's score is too high, key:%s, score:%f", most_similar_key, most_similar_score)uploadQueryEmbedding(ctx, config, log, key, text_embedding)proxywasm.ResumeHttpRequest()return}},100000)}
// 未命中cache,则将新的query embedding和对应的key存入向量数据库func uploadQueryEmbedding(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, key string, text_embedding []float64) error {vector_url, vector_body, err := ConsturctEmbeddingInsertParameters(&config, log, text_embedding, key)if err != nil {log.Errorf("Failed to construct embedding insert parameters: %v", err)proxywasm.ResumeHttpRequest()return nil}err = config.DashVectorInfo.DashVectorClient.Post(vector_url,[][2]string{{"Content-Type", "application/json"},{"dashvector-auth-token", config.DashVectorInfo.DashVectorKey},},vector_body,func(statusCode int, responseHeaders http.Header, responseBody []byte) {if statusCode != 200 {log.Errorf("Failed to upload query embedding: %s", responseBody)} else {log.Infof("Successfully uploaded query embedding for key: %s", key)}proxywasm.ResumeHttpRequest()},10000,)if err != nil {log.Errorf("Failed to upload query embedding: %v", err)proxywasm.ResumeHttpRequest()return nil}return nil}
// ===================== 以上是主要逻辑 =====================

此外,该逻辑只能在返回值为 types.Action 的函数中使用,例如 onHttpResponseBody 这样的流式处理函数无法以类似方式处理。尽管可以确保请求被发送出去,但由于没有阻塞操作,无法调用回调函数。如果有需要,可以参考 wasm-go/pkg/wrapper/http_wrapper.go,添加信号变量进行修改。


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询

扫码登录
登录即表示您同意《53AI网站服务协议》
服务协议

欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。

在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。

一、 定义

本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。

会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。

知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。

二、 账号注册与登录

登录方式:本网站支持以下登录方式,您可根据实际情况选择:

微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。

手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。

账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。

实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。

未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。

三、 服务内容与规范

知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。

服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。

禁止行为:您在使用服务时不得实施以下行为:

利用技术手段批量爬取、下载、转存知识库内容;

将知识库内容用于商业目的或未经授权地向第三方传播;

干扰本网站正常运行或侵犯其他用户合法权益;

发布违法违规信息或从事违反公序良俗的活动。

四、 知识产权声明

权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。

有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。

侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。

五、 个人信息保护

我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。

您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。

您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。

六、 免责声明

内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。

不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。

第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。

七、 违约责任

如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。

如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。

八、 法律适用与争议解决

本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。

因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。

九、 其他

本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。

本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。

我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。


已查阅