微信扫码
添加专属顾问
我要投稿
RAG检索结果相似度高但不可用?CRAG通过评估和纠正机制解决这一痛点,让检索结果真正精准可用。 核心内容: 1. 传统RAG的三个检索问题:检索偏差、时效性缺失、记忆污染 2. CRAG的四步闭环机制:检索→评估→纠正→生成 3. 如何用langchain+Milvus搭建CRAG系统
Partition Key 实现零成本多租户隔离
原生混合检索,搞定边界场景的检索失效
JSON 字段,支持记忆结构灵活演化
fields = [ FieldSchema(name="agent_id", dtype=DataType.VARCHAR, is_partition_key=True), # 多租户 FieldSchema(name="dense_embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), # 语义检索 FieldSchema(name="sparse_embedding", dtype=DataType.SPARSE_FLOAT_VECTOR),# BM25FieldSchema(name="metadata", dtype=DataType.JSON),# 动态 Schema]# 混合检索 + 元数据过滤results = collection.hybrid_search( reqs=[ AnnSearchRequest(data=[dense_vec], anns_field="dense_embedding", limit=20), AnnSearchRequest(data=[sparse_vec], anns_field="sparse_embedding", limit=20) ], rerank=RRFRanker(), output_fields=["metadata"], expr='metadata["confidence"] > 0.9',# CRAG 置信度过滤 limit=5)
export OPENAI_API_KEY="your-api-key"export TAVILY_API_KEY="your-tavily-key"
# filename: crag_agent.py# ============ 导入依赖 ============from typing import Literal, Listfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware): """CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)""" def __init__(self, vector_store: Milvus, agent_id: str): super().__init__() self.vector_store = vector_store self.agent_id = agent_id # 多租户隔离 # 轻量评估器:用于相关性判定(可替换为你后文的结构化版本) self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0) # Web 搜索托底 self.web_search = TavilySearchResults(max_results=3) @before_model def run_crag(self, state): """在模型调用前执行检索→评估→纠正,准备最终上下文""" # 获取最后一条用户消息 last_msg = state["messages"][-1] query = getattr(last_msg, "content", "") if hasattr(last_msg, "content") else last_msg.get("content", "") # 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤) docs = self._retrieve_from_milvus(query) # 2. 评估:三元判决 verdict = self._evaluate_relevance(query, docs) # 3. 纠正:根据判决决定处理策略 if verdict == "incorrect": # 检索失败,完全依赖 Web 搜索 web_results = self._web_search_fallback(query) final_context = self._format_web_results(web_results) elif verdict == "ambiguous": # 检索模糊,精炼文档 + Web 搜索补充 refined_docs = self._refine_documents(docs, query) web_results = self._web_search_fallback(query) final_context = self._merge_context(refined_docs, web_results) else: # 检索质量良好,只精炼文档 refined_docs = self._refine_documents(docs, query) final_context = self._format_internal_docs(refined_docs) # 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接 state["_crag_context"] = final_context return state @dynamic_prompt def attach_context(self, state, prompt_messages: List): """将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前""" final_context = state.get("_crag_context") if final_context: sys_msg = SystemMessage( content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}" ) # 仅对当前调用生效,不永久写入 state["messages"] prompt_messages = [sys_msg] + prompt_messages return prompt_messages # ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ======== def _retrieve_from_milvus(self, query: str) -> list: """从 Milvus 检索文档(Partition Key + 置信度过滤)""" try: # 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 expr docs = self.vector_store.similarity_search( query, k=3, search_kwargs={"expr": f'agent_id == "{self.agent_id}"'} ) # 置信度过滤(避免低质量记忆污染) filtered_docs = [ doc for doc in docs if (doc.metadata or {}).get("confidence", 0.0) > 0.7 ] return filtered_docs or docs # 若无高置信度,退回原结果以便 evaluator 判定 except Exception as e: print(f"[CRAG] 检索失败: {e}") return [] def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]: """评估文档相关性(三元判决),简化版:LLM 直接返回 verdict""" if not docs: return "incorrect" # 只评估 Top-3 文档,每个文档取前 500 字符 doc_content = "\n\n".join([ f"[文档{i+1}] {(doc.page_content or '')[:500]}..." for i, doc in enumerate(docs[:3]) ]) prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect""" try: result = self.evaluator.invoke(prompt) verdict = (getattr(result, "content", "") or "").strip().lower() if verdict not in {"relevant", "ambiguous", "incorrect"}: verdict = "ambiguous" return verdict except Exception as e: print(f"[CRAG] 评估失败: {e}") return "ambiguous" def _refine_documents(self, docs: list, query: str) -> list: """精炼文档(简化条带:基于关键词的句子筛选)""" refined = [] # 简单中文句号替换 + 英文断句的粗切 keywords = [kw.strip() for kw in query.split() if kw.strip()] for doc in docs: text = doc.page_content or "" sentences = ( text.replace("。", "。\n") .replace(". ", ".\n") .replace("! ", "!\n") .replace("? ", "?\n") .split("\n") ) sentences = [s.strip() for s in sentences if s.strip()] # 命中任一关键词 relevant_sentences = [ s for s in sentences if any(keyword in s for keyword in keywords) ] if relevant_sentences: refined_text = "。".join(relevant_sentences[:3]) refined.append(Document(page_content=refined_text, metadata=doc.metadata or {})) return refined if refined else docs # 若未提取到,回退原文档 def _web_search_fallback(self, query: str) -> list: """Web 搜索托底""" try: return self.web_search.invoke(query) or [] except Exception as e: print(f"[CRAG] Web 搜索失败: {e}") return [] def _merge_context(self, internal_docs: list, web_results: list) -> str: """合并内部记忆与外部知识为最终上下文""" parts = [] if internal_docs: parts.append("【内部记忆】") for i, doc in enumerate(internal_docs, 1): parts.append(f"{i}. {doc.page_content}") if web_results: parts.append("【外部知识】") for i, result in enumerate(web_results, 1): content = (result or {}).get("content", "") url = (result or {}).get("url", "") parts.append(f"{i}. {content}\n 来源: {url}") return "\n\n".join(parts) if parts else "未找到相关信息" def _format_internal_docs(self, docs: list) -> str: """格式化内部文档""" if not docs: return "未找到相关信息" parts = ["【内部记忆】"] for i, doc in enumerate(docs, 1): parts.append(f"{i}. {doc.page_content}") return "\n\n".join(parts) def _format_web_results(self, results: list) -> str: """格式化 Web 搜索结果""" if not results: return "未找到相关信息" parts = ["【外部知识】"] for i, result in enumerate(results, 1): content = (result or {}).get("content", "") url = (result or {}).get("url", "") parts.append(f"{i}. {content}\n 来源: {url}") return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus( embedding_function=OpenAIEmbeddings(), connection_args={"host": "localhost", "port": "19530"}, collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent( model="openai:gpt-4o", tools=[TavilySearchResults(max_results=3)], # Web 搜索工具 middleware=[ CRAGMiddleware( vector_store=vector_store, agent_id="user_123_session_456" # 多租户隔离:每个 Agent 实例使用独立 ID ) ])# ============ 运行示例 ============if __name__ == "__main__": # 示例查询:使用 HumanMessage 以保证兼容性 response = agent.invoke({ "messages": [ HumanMessage(content="Nike 最新季度财报中的运营成本是多少?") ] }) print(response["messages"][-1].content)# filename: crag_agent.py# ============ 导入依赖 ============from typing import Literal, Listfrom langchain.agents import create_agentfrom langchain.agents.middleware import AgentMiddleware, before_model, dynamic_promptfrom langchain.chat_models import init_chat_modelfrom langchain_milvus import Milvusfrom langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom langchain_core.messages import SystemMessage, HumanMessagefrom langchain_community.tools.tavily_search import TavilySearchResults# ============ CRAG Middleware(最小改动版) ============class CRAGMiddleware(AgentMiddleware): """CRAG 评估与纠正中间件(使用官方装饰器注册钩子,避免永久污染消息栈)""" def __init__(self, vector_store: Milvus, agent_id: str): super().__init__() self.vector_store = vector_store self.agent_id = agent_id # 多租户隔离 # 轻量评估器:用于相关性判定(可替换为你后文的结构化版本) self.evaluator = init_chat_model("openai:gpt-4o-mini", temperature=0) # Web 搜索托底 self.web_search = TavilySearchResults(max_results=3) @before_model def run_crag(self, state): """在模型调用前执行检索→评估→纠正,准备最终上下文""" # 获取最后一条用户消息 last_msg = state["messages"][-1] query = getattr(last_msg, "content", "") if hasattr(last_msg, "content") else last_msg.get("content", "") # 1. 检索:从 Milvus 获取文档(PartitionKey + 置信度过滤) docs = self._retrieve_from_milvus(query) # 2. 评估:三元判决 verdict = self._evaluate_relevance(query, docs) # 3. 纠正:根据判决决定处理策略 if verdict == "incorrect": # 检索失败,完全依赖 Web 搜索 web_results = self._web_search_fallback(query) final_context = self._format_web_results(web_results) elif verdict == "ambiguous": # 检索模糊,精炼文档 + Web 搜索补充 refined_docs = self._refine_documents(docs, query) web_results = self._web_search_fallback(query) final_context = self._merge_context(refined_docs, web_results) else: # 检索质量良好,只精炼文档 refined_docs = self._refine_documents(docs, query) final_context = self._format_internal_docs(refined_docs) # 4. 将上下文放入临时键,仅用于“当前模型调用”的动态提示拼接 state["_crag_context"] = final_context return state @dynamic_prompt def attach_context(self, state, prompt_messages: List): """将 CRAG 合成上下文以 SystemMessage 注入到“本次模型调用”的提示前""" final_context = state.get("_crag_context") if final_context: sys_msg = SystemMessage( content=f"以下是相关背景信息,请基于这些信息回答用户问题:\n\n{final_context}" ) # 仅对当前调用生效,不永久写入 state["messages"] prompt_messages = [sys_msg] + prompt_messages return prompt_messages # ======== 内部方法:检索 / 评估 / 精炼 / 格式化 ======== def _retrieve_from_milvus(self, query: str) -> list: """从 Milvus 检索文档(Partition Key + 置信度过滤)""" try: # 注意:不同版本的适配器对过滤参数位置不同,这里使用 search_kwargs 传递 expr docs = self.vector_store.similarity_search( query, k=3, search_kwargs={"expr": f'agent_id == "{self.agent_id}"'} ) # 置信度过滤(避免低质量记忆污染) filtered_docs = [ doc for doc in docs if (doc.metadata or {}).get("confidence", 0.0) > 0.7 ] return filtered_docs or docs # 若无高置信度,退回原结果以便 evaluator 判定 except Exception as e: print(f"[CRAG] 检索失败: {e}") return [] def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]: """评估文档相关性(三元判决),简化版:LLM 直接返回 verdict""" if not docs: return "incorrect" # 只评估 Top-3 文档,每个文档取前 500 字符 doc_content = "\n\n".join([ f"[文档{i+1}] {(doc.page_content or '')[:500]}..." for i, doc in enumerate(docs[:3]) ]) prompt = f"""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{doc_content}评估标准:- relevant:文档直接包含答案,高度相关- ambiguous:文档部分相关,需要补充外部知识- incorrect:文档不相关,无法回答查询只返回一个词:relevant 或 ambiguous 或 incorrect""" try: result = self.evaluator.invoke(prompt) verdict = (getattr(result, "content", "") or "").strip().lower() if verdict not in {"relevant", "ambiguous", "incorrect"}: verdict = "ambiguous" return verdict except Exception as e: print(f"[CRAG] 评估失败: {e}") return "ambiguous" def _refine_documents(self, docs: list, query: str) -> list: """精炼文档(简化条带:基于关键词的句子筛选)""" refined = [] # 简单中文句号替换 + 英文断句的粗切 keywords = [kw.strip() for kw in query.split() if kw.strip()] for doc in docs: text = doc.page_content or "" sentences = ( text.replace("。", "。\n") .replace(". ", ".\n") .replace("! ", "!\n") .replace("? ", "?\n") .split("\n") ) sentences = [s.strip() for s in sentences if s.strip()] # 命中任一关键词 relevant_sentences = [ s for s in sentences if any(keyword in s for keyword in keywords) ] if relevant_sentences: refined_text = "。".join(relevant_sentences[:3]) refined.append(Document(page_content=refined_text, metadata=doc.metadata or {})) return refined if refined else docs # 若未提取到,回退原文档 def _web_search_fallback(self, query: str) -> list: """Web 搜索托底""" try: return self.web_search.invoke(query) or [] except Exception as e: print(f"[CRAG] Web 搜索失败: {e}") return [] def _merge_context(self, internal_docs: list, web_results: list) -> str: """合并内部记忆与外部知识为最终上下文""" parts = [] if internal_docs: parts.append("【内部记忆】") for i, doc in enumerate(internal_docs, 1): parts.append(f"{i}. {doc.page_content}") if web_results: parts.append("【外部知识】") for i, result in enumerate(web_results, 1): content = (result or {}).get("content", "") url = (result or {}).get("url", "") parts.append(f"{i}. {content}\n 来源: {url}") return "\n\n".join(parts) if parts else "未找到相关信息" def _format_internal_docs(self, docs: list) -> str: """格式化内部文档""" if not docs: return "未找到相关信息" parts = ["【内部记忆】"] for i, doc in enumerate(docs, 1): parts.append(f"{i}. {doc.page_content}") return "\n\n".join(parts) def _format_web_results(self, results: list) -> str: """格式化 Web 搜索结果""" if not results: return "未找到相关信息" parts = ["【外部知识】"] for i, result in enumerate(results, 1): content = (result or {}).get("content", "") url = (result or {}).get("url", "") parts.append(f"{i}. {content}\n 来源: {url}") return "\n\n".join(parts)# ============ 初始化 Milvus 向量数据库 ============vector_store = Milvus( embedding_function=OpenAIEmbeddings(), connection_args={"host": "localhost", "port": "19530"}, collection_name="agent_memory")# ============ 创建 Agent ============agent = create_agent( model="openai:gpt-4o", tools=[TavilySearchResults(max_results=3)], # Web 搜索工具 middleware=[ CRAGMiddleware( vector_store=vector_store, agent_id="user_123_session_456" # 多租户隔离:每个 Agent 实例使用独立 ID ) ])# ============ 运行示例 ============if __name__ == "__main__": # 示例查询:使用 HumanMessage 以保证兼容性 response = agent.invoke({ "messages": [ HumanMessage(content="Nike 最新季度财报中的运营成本是多少?") ] }) print(response["messages"][-1].content)from pydantic import BaseModelfrom langchain.prompts import PromptTemplateclass RelevanceVerdict(BaseModel):"""评估结果的结构化输出"""verdict: Literal["relevant", "ambiguous", "incorrect"]confidence: float # 置信度分数(用于记忆质量监控)reasoning: str # 判断理由(用于调试和审核)# 注意:CRAG 论文使用微调的 T5-Large 评估器(10-20ms 延迟)# 这里使用 gpt-4o-mini 作为工程实现方案(更易部署,但延迟略高)grader_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)grader_prompt = PromptTemplate(template="""你是文档相关性评估专家。评估以下文档是否能回答查询。查询:{query}文档内容:{document}评估标准:- relevant:文档直接包含答案,置信度 > 0.9- ambiguous:文档部分相关,置信度 0.5-0.9- incorrect:文档不相关,置信度 < 0.5返回 JSON 格式:{{"verdict": "...", "confidence": 0.xx, "reasoning": "..."}}""",input_variables=["query", "document"])grader_chain = grader_prompt | grader_llm.with_structured_output(RelevanceVerdict)# 替换 CRAGMiddleware 中的 _evaluate_relevance() 方法def _evaluate_relevance(self, query: str, docs: list) -> Literal["relevant", "ambiguous", "incorrect"]:"""评估文档相关性(返回结构化结果)"""if not docs:return "incorrect"# 只评估 Top-3 文档,每个文档取前 500 字符doc_content = "\n\n".join([f"[文档{i+1}] {doc.page_content[:500]}..."for i, doc in enumerate(docs[:3])])result = grader_chain.invoke({"query": query,"document": doc_content})# 将置信度存储到日志或监控系统print(f"[CRAG 评估] verdict={result.verdict}, confidence={result.confidence:.2f}")print(f"[CRAG 推理] {result.reasoning}")# 可选:将评估结果存储到 Milvus,用于记忆质量分析self._store_evaluation_metrics(query, result)return result.verdictdef _store_evaluation_metrics(self, query: str, verdict_result: RelevanceVerdict):"""存储评估指标到 Milvus(用于记忆质量监控)"""# 示例:将评估结果存储到单独的 Collection 用于分析# 实际使用时需要创建 evaluation_metrics Collectionpass
第二,可观测性:把监控体系搭起来
第三,长期治理:严防记忆污染
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-03-11
上下文腐烂:拖垮企业AI与LLM表现的隐患与对策
2026-03-10
从向量里逆向出原始文本和模型来源
2026-02-27
如何用 AI 做业务级 Code Review
2026-02-22
不用向量数据库的 RAG,居然跑得更准了?
2026-02-22
AIOps探索:做运维领域的RAG,如何做数据清洗
2026-02-21
Claude Code 每次都要重新探索代码?这个工具直接省下30%成本
2026-02-18
函数计算 AgentRun 重磅上线知识库功能,赋能智能体更“懂”你
2026-02-15
当RAG遇上Agent记忆:为什么相似度检索会"塌方"?
2026-01-15
2026-01-02
2025-12-23
2026-02-13
2026-02-03
2025-12-18
2026-02-03
2025-12-31
2026-01-06
2025-12-29
2026-03-11
2026-02-22
2026-02-15
2026-02-04
2026-02-03
2026-01-19
2026-01-12
2026-01-08