微信扫码
添加专属顾问
我要投稿
RAG系统的真正潜力不仅在于检索,更在于这四个关键后处理步骤,它们决定了最终答案的质量与可靠性。 核心内容: 1. 检索结果精炼:通过重排序、压缩与筛选提升上下文信噪比 2. 架构优化:引入查询路由等模式构建智能弹性系统 3. 生成控制与防范:通过Prompt工程和事实护栏确保回答准确可靠
# 示例: 使用Flashrank 进行重排序from langchain.retrievers.document_compressors import FlashrankRerankfrom langchain.retrievers import ContextualCompressionRetriever# 1. 定义一个重排序压缩器# top_n 是重排序后返回多少个文档rerank_compressor = FlashrankRerank( model="miniReranker_arabic_v1", top_n=3)# 2. 创建一个 ContextualCompressionRetriever, 使用 Flashrank 作为压缩器rerank_retriever = ContextualCompressionRetriever( base_compressor=rerank_compressor, base_retriever=vectorstore.as_retriever(search_kwargs={"k": 5}) # 先检索5个再重排)# 3. 使用print("\n--- Reranking (Flashrank) 示例 ---")query_rerank = "LangChain的最新功能是什么?"retrieved_reranked_docs = rerank_retriever.invoke(query_rerank)print(f"对查询 '{query_rerank}' 的重排序检索结果({len(retrieved_reranked_docs)} 个文档):")for i, doc in enumerate(retrieved_reranked_docs): print(f"文档 {i+1} (分数: {doc.metadata.get('relevance_score', 'N/A')}):\n{doc.page_content[:100]}...")print("-" * 30)from langchain.retrievers import ContextualCompressionRetrieverfrom langchain.retrievers.document_compressors import LLMChainExtractorfrom langchain_openai import ChatOpenAI# 1. 定义一个基础检索器(先多检索一些,再压缩)base_retriever_for_comp = vectorstore.as_retriever(search_kwargs={"k": 5})# 2. 定义一个 LLMChainExtractor (压缩器)compressor = LLMChainExtractor.from_llm(llm=llm)# 3. 创建 ContextualCompressionRetrievercompression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=base_retriever_for_comp)# 4. 使用query_comp = "LangChain的调试工具叫什么?它的主要作用是什么?"retrieved_compressed_docs = compression_retriever.invoke(query_comp)print(f"对查询 '{query_comp}' 的ContextualCompressionRetriever 检索结果:")for i, doc in enumerate(retrieved_compressed_docs): original_len = len(doc.metadata.get('original_content', doc.page_content)) compressed_len = len(doc.page_content) print(f"文档 {i+1}(原始长度: {original_len}, 压缩后长度: {compressed_len}):") print(doc.page_content) print("-" * 30)from typing import List, Tupleimport numpy as npfrom langchain_core.documents import Documentdef find_elbow_point(scores: np.ndarray) -> int:"""使用点到直线最大距离的纯几何方法。返回的是拐点在原始列表中的索引。"""n_points = len(scores)if n_points < 3:return n_points -1 # 返回最后一个点的索引# 创建点坐标 (x, y),x是索引,y是分数points = np.column_stack((np.arange(n_points), scores))# 获取第一个点和最后一个点first_point = points[0]last_point = points[-1]# 计算每个点到首末点连线的垂直距离# 使用向量射影的方法line_vec = last_point - first_pointline_vec_normalized = line_vec / np.linalg.norm(line_vec)vec_from_first = points - first_point# scalar_product 是每个点向量在直线方向上的投影长度scalar_product = np.dot(vec_from_first, line_vec_normalized)# vec_parallel 是投影向量vec_parallel = np.outer(scalar_product, line_vec_normalized)# vec_perpendicular 是垂直向量,它的模长就是距离vec_perpendicular = vec_from_first - vec_paralleldist_to_line = np.linalg.norm(vec_perpendicular, axis=1)# 找到距离最大的点的索引elbow_index = np.argmax(dist_to_line)return elbow_indexdef truncate_with_elbow_method_final(reranked_docs: List[Tuple[float, Document]]) -> List[Document]:if not reranked_docs or len(reranked_docs) < 3:print("文档数量不足3个,无法进行拐点检测,返回所有文档。")return [doc for _, doc in reranked_docs]scores = np.array([score for score, _ in reranked_docs])docs = [doc for _, doc in reranked_docs]# 调用我们验证过有效的拐点检测函数elbow_index = find_elbow_point(scores)# 我们需要包含拐点本身,所以截取到 elbow_index + 1num_docs_to_keep = elbow_index + 1final_docs = docs[:num_docs_to_keep]print(f"检测到分数拐点在第 {elbow_index + 1} 位。截断后返回 {len(final_docs)} 个文档。")return final_docsprint("\n--- 拐点检测示例 ---")# 假设 reranked_docs 是你的输入数据reranked_docs = [(0.98, "文档1"),(0.95, "文档2"),(0.92, "文档3"),(0.75, "文档4"),(0.5, "文档5"),(0.48, "文档6")]final_documents = truncate_with_elbow_method_final(reranked_docs)print(final_documents)# 输出前三个文档
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-09-15
2025-09-02
2025-08-05
2025-08-18
2025-08-25
2025-08-25
2025-08-25
2025-09-03
2025-08-20
2025-09-08
2025-10-04
2025-09-30
2025-09-10
2025-09-10
2025-09-03
2025-08-28
2025-08-25
2025-08-20