微信扫码
添加专属顾问
我要投稿
阿里云AI安全护栏为Dify平台带来全方位AI安全防护,从输入到输出构建端到端安全闭环,助力开发者轻松应对大模型应用中的各类风险挑战。 核心内容: 1. 阿里云AI安全护栏的核心优势与功能特性 2. 专为Dify优化的两种集成模式详解 3. 多模态防护与弹性性能配置方案
随着大模型在企业前台业务中的广泛应用,从智能客服、AI搜索到虚拟助手,每一次用户交互都潜藏着不可忽视的安全风险。一旦模型输出违规内容、泄露敏感信息或被恶意操控,不仅影响用户体验,更可能引发法律合规、品牌声誉乃至系统级安全危机。
在此背景下,阿里云AI安全护栏(AI Guardrails)正式登陆Dify Marketplace,为使用Dify构建AI应用的开发者提供原生集成的一站式AI安全解决方案。通过插件化部署或API扩展方式,开发者可在工作流(Workflow)、Agent和Chatflow中快速启用输入输出双向防护机制,实现从用户输入与模型输出的双向检测。
阿里云AI安全护栏产品架构图
从输入到输出,构建端到端安全闭环,全面应对大模型在真实业务中面临的内容安全、外部攻击、隐私泄露与输出失控等关键挑战。
深度集成Qwen3-Guard与基于Qwen系列SFT的审核大模型,融合对抗检测与语义理解能力,可精准识别变体、谐音、隐喻表达及意识形态渗透等高隐蔽性风险。
支持全链路流式审核,在模型逐段生成内容的过程中实时送检,显著降低从token生成到风险发现的延迟,保障高并发场景下的交互流畅性与安全性。
支持单轮问答、多轮问答等场景下的风险检测,通过融合历史对话信息,识别跨轮次诱导、语义漂移与越狱行为,实现对完整对话意图的准确理解,避免因上下文割裂导致的误判。
支持文本、图片、文件等模态的混合检测,有效识别跨模态隐藏指令与复合型攻击,实现多模态风险覆盖。
支持All-in-One API接入,一次调用即可完成全模态检测,按需启用防护能力,集成简单高效,适配主流AI应用架构,助力客户敏捷上线。目前,阿里云AI安全护栏为客户提供包括API接入、阿里云百炼接入、阿里云WAF接入等多种一键接入模式。
通过算法编排动态平衡精度、时延与成本;对于高并发、低延迟场景,可在保障检测效果的前提下提供高性能服务,满足严苛生产需求。
提供可视化控制台,支持风险策略配置、黑白名单、阈值调节与效果验证;更支持用户创建自定义检测Agent——可定义专属标签与提示词,精准识别金融、医疗、教育等行业或特定场景下的业务风险,实现安全能力的灵活扩展与深度定制。
针对当前大量开发者使用Dify构建AI应用的现状,AI安全护栏已上架Dify官方插件市场,提供两种集成路径:
◆ 适用场景:标准Workflow、非流式输出应用;
◆ 适用场景:Agent、Chatflow、需支持流式输出的应用;
◆ 操作方式:
AI安全护栏产品的API最大支持单次2000字符输入,因此输入长度大于2000字符的情况下,需要进行适配,处理方法如下:
输入审查:将输入切分为多段,每段不超过2000个字符,并发调用安全护栏API。
输出审查:Dify每隔300字符左右发起一次内容审核API调用,处理上截取最近2000字符进行调用。
以下分别为处理逻辑和启动脚本的示例代码:
from fastapi import FastAPI, Body, HTTPException, Headerfrom pydantic import BaseModelimport base64from collections.abc import Generatorfrom typing import Anyimport hmacimport hashlibfrom urllib.parse import quoteimport requestsfrom datetime import datetimefrom datetime import timezoneimport uuidimport jsonimport reimport concurrent.futures# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"# 超过这个长度时对文本进行切分MAX_LENGTH = 2000# 调用安全护栏的输入检测和输出检测的ServiceCodeSERVICE_INPUT = "query_security_check"SERVICE_OUTPUT = "response_security_check"ENCODING = "UTF-8"ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"ALGORITHM = "HmacSHA1"def format_iso8601_date(): return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)def percent_encode(value): if value is None: return "" return ( quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A") )def create_signature(string_to_sign, secret): secret = secret + "&" signature = hmac.new( secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1 ).digest() return base64.b64encode(signature).decode(ENCODING)def create_string_to_sign(http_method, parameters): sorted_keys = sorted(parameters.keys()) canonicalized_query_string = "" for key in sorted_keys: canonicalized_query_string += ( "&" + percent_encode(key) + "=" + percent_encode(parameters[key]) ) string_to_sign = ( http_method + "&" + percent_encode("/") + "&" + percent_encode(canonicalized_query_string[1:]) ) return string_to_signdef split_text(text: str, max_length: int = 1950) -> list[str]: """将文本按 max_length 分段,尽量保留完整句子(识别多种标点)""" segments = [] while len(text) > max_length: # 提取当前最大长度范围内的子串 chunk = text[:max_length] # 使用正则查找最后一个句号、感叹号、问号等断句符号的位置 match = None for pattern in [r"[。!?;:\.?!]+"]: # 匹配多种结束符号 matches = list(re.finditer(pattern, chunk)) if matches: match = matches[-1] # 取最后一个匹配项 if match: cut_point = match.end() # 包含标点符号 else: cut_point = max_length # 找不到就强制截断 segments.append(text[:cut_point]) text = text[cut_point:] if text: segments.append(text) return segmentsdef request(content_segment, type, aliyun_access_key, aliyun_access_secret): print(datetime.now(), f" [{type} request content]-> {content_segment}") # 3.1 构造请求参数 parameters = { "Action": "MultiModalGuard", "Version": "2022-03-02", "AccessKeyId": aliyun_access_key, "Timestamp": format_iso8601_date(), "SignatureMethod": "HMAC-SHA1", "SignatureVersion": "1.0", "SignatureNonce": str(uuid.uuid4()), "Format": "JSON", "Service": ( SERVICE_INPUT if type == "input" else SERVICE_OUTPUT ), "ServiceParameters": json.dumps( {"content": content_segment}, ensure_ascii=False ), } string_to_sign = create_string_to_sign("POST", parameters) signature = create_signature(string_to_sign, aliyun_access_secret) parameters["Signature"] = signature # 3.2 发送请求 response = requests.post(SERVICE_URL, data=parameters) body = response.json() print(datetime.now(), " [response body]-> ", body) if response.status_code != 200: raise Exception( f"response http status_code not 200. status_code: {response.status_code}, body: {body}" ) if body.get("Code") != 200: raise Exception( f"response code not 200. code: {body.get('Code')}, body: {body}" ) return bodyapp = FastAPI()class InputData(BaseModel): point: str params: dict = {}@app.post("/api/dify/receive")async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)): """ Receive API query data from Dify. """ #print(data) auth_scheme, _, api_key = authorization.partition(" ") if auth_scheme.lower() != "bearer": raise HTTPException(status_code=401, detail="Unauthorized") # api_key decode try: decoded_bytes = base64.b64decode(api_key) decoded_str = decoded_bytes.decode("utf-8") ak, sk = decoded_str.split(":", 1) except Exception as e: # 如果调用失败,抛出异常 raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}") point = data.point if point == "ping": return {"result": "pong"} if point == "app.moderation.input": return handle_app_moderation_input(params=data.params, ak=ak, sk=sk) elif point == "app.moderation.output": return handle_app_moderation_output(params=data.params, ak=ak, sk=sk) raise HTTPException(status_code=400, detail="Not implemented")def handle_app_moderation_input(params: dict, ak: str, sk: str): app_id = params.get("app_id") inputs = params.get("inputs", {}) query = params.get("query") contents = ( [query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50) ) # 并发执行 bodys = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents] for future in concurrent.futures.as_completed(futures): bodys.append(future.result()) contentModerationSuggestion="" sensitiveDataSuggestion="" promptAttackSuggestion="" maliciousUrlSuggestion="" _finalSuggestion="pass" desensitization="" # 遍历bodys解析出各个检测项的建议 for body in bodys: finalSuggestion = body.get("Data", {}).get("Suggestion", "") detailList = body.get("Data", {}).get("Detail", []) if finalSuggestion and _finalSuggestion!="block" : _finalSuggestion = finalSuggestion for detail in detailList: suggestion = detail.get("Suggestion", "") type = detail.get("Type", "") if type == "contentModeration": if suggestion and contentModerationSuggestion!="block" : contentModerationSuggestion = suggestion elif type == "sensitiveData": desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","") if suggestion and sensitiveDataSuggestion!="block" : sensitiveDataSuggestion = suggestion elif type == "promptAttack": if suggestion and promptAttackSuggestion!="block" : promptAttackSuggestion = suggestion elif type == "maliciousUrl": if suggestion and maliciousUrlSuggestion!="block" : maliciousUrlSuggestion = suggestion # 可以根据不同的场景返回不同的回答内容 output_response = "Your content violates our usage policy." if contentModerationSuggestion=="block": output_response = "Your content involves content security." elif sensitiveDataSuggestion=="block" or sensitiveDataSuggestion=="mask": output_response = "Your content involves sensitive data." elif promptAttackSuggestion=="block": output_response = "Your content involves prompt attack." elif maliciousUrlSuggestion=="block": output_response = "Your content involves malicious url." flagged = False action = "direct_output" if _finalSuggestion == "block" : flagged = True elif sensitiveDataSuggestion=="mask": flagged = True action = "overridden" query = desensitization response = {"flagged": flagged, "action": action} if flagged: if action == "direct_output": response["preset_response"] = output_response elif action == "overridden": response["inputs"] = inputs response["query"] = query print(response) return responsedef handle_app_moderation_output(params: dict, ak: str, sk: str): app_id = params.get("app_id") text = params.get("text", "") print(f"handle_app_moderation_output length:{len(text)}") # 获取最近的2000字符,大小根据需要调整,建议大于dify的窗口大小 if len(text) > MAX_LENGTH: content = text[-MAX_LENGTH:] else: content = text # 执行检测 body = request(content, "output", ak, sk) contentModerationSuggestion="" sensitiveDataSuggestion="" promptAttackSuggestion="" maliciousUrlSuggestion="" desensitization="" _finalSuggestion=body.get("Data", {}).get("Suggestion", "") detailList = body.get("Data", {}).get("Detail", []) for detail in detailList: suggestion = detail.get("Suggestion", "") type = detail.get("Type", "") if type == "contentModeration": contentModerationSuggestion = suggestion elif type == "sensitiveData": desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","") sensitiveDataSuggestion = suggestion elif type == "promptAttack": promptAttackSuggestion = suggestion elif type == "maliciousUrl": maliciousUrlSuggestion = suggestion # 可以根据不同的场景返回不同的回答内容 output_response = "Your content violates our usage policy." if contentModerationSuggestion=="block": output_response = "Your content involves content security." elif sensitiveDataSuggestion=="block": output_response = "Your content involves sensitive data." elif promptAttackSuggestion=="block": output_response = "Your content involves prompt attack." elif maliciousUrlSuggestion=="block": output_response = "Your content involves malicious url." flagged = False action = "direct_output" if _finalSuggestion == "block": flagged = True elif sensitiveDataSuggestion=="mask": flagged = True action = "overridden" response = {"flagged": flagged, "action": action} if flagged: if action == "direct_output": response["preset_response"] = output_response elif action == "overridden": response["text"] = desensitization print(response) return responseif __name__ == "__main__": import uvicorn # 开放端口可以根据自定义选择 uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)将上述Python代码保存到main.py中,使用如下命令启动:
# 启动脚本示例pip install fastapi uvicornuvicorn main:app --reload --host 0.0.0.0
以上输出内容审核的示例代码中,默认为直接回复拒答内容,您可以通过调整返回的action字段,更换为内容替换模式,即将命中的关键词或敏感内容数据替换为*(星号)。
新增API扩展:通过设置-API扩展页面,来实现新增API扩展。
API Endpoint:填写部署转发服务脚本后的可访问地址。
API-Key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码参考base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64# AccessKeyId 和 AccessKeySecretaccess_key_id = ""access_key_secret = ""# 拼接并编码auth_str = f"{access_key_id}:{access_key_secret}"encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')print(encoded_auth)在Agent中配置API扩展:需要在Agent中完成API扩展的配置,已实现成功集成。
在Agent页面右下角选择管理,去配置内容审查。
选择API扩展。
选中已创建完成的AI安全护栏的API扩展。
根据业务需求选择是否打开输入和输出内容的开关。
在输出时Dify会累计约300个字符做一次内容审查。
AI安全能力的建设,不应成为应用开发的额外负担。阿里云AI安全护栏的加入,为Dify社区提供了一种新的可能性:将专业的安全检测能力,以标准化、可配置的方式,无缝融入生成式AI的构建流程中。这不仅是工具链的延伸,更是对“可信赖AI”实践路径的一次探索。
我们相信,只有当安全能力足够轻量、灵活且易于集成,开发者才能在创新与风险控制之间,真正掌握主动权。Dify始终致力于打造开放、透明、可持续的插件生态。也期待每一位开发者在实际场景中验证、反馈并完善这些能力。让AI更好用,也要更安心。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-10-29
KnowFlow 无缝衔接 Dify,分块支持添加父标题、Title 切块支持自定义标题层级切割、图片理解新增支持上下文
2025-10-28
邪修榨干元宝AI录音笔:白天工作,晚上陪聊
2025-10-27
免费又好用的AI录音笔都出来了,这下哪还有理由不学习?
2025-10-24
Aiops探索:我用Dify结合k8s的api做了一个非常简单的Aiops智能体
2025-10-24
阿里夸克AI眼镜开售:叠加补贴后3699元,它能打破AI眼镜的魔咒吗?
2025-10-13
Dify + 飞书组合拳:企业级 AI 安全大脑落地全指南,助力安管效率提升 300%
2025-09-23
专访Plaud中国区CEO:我们只做“必须做”和“不做要死”的事
2025-09-21
一款随时心理陪伴的AI硬件
2025-09-19
2025-09-02
2025-10-13
2025-09-02
2025-10-24
2025-08-25
2025-09-05
2025-09-02
2025-08-22
2025-08-28