微信扫码
添加专属顾问
我要投稿
还在为AI客服重复回答相同问题而烦恼?CAG技术让AI学会"记忆",效率提升300%!核心内容: 1. 传统RAG技术的局限性分析 2. CAG技术原理与缓存机制详解 3. 从RAG到CAG的完整代码实现指南
不知道你有没有遇到这样的情况,AI客服每天要回答几千个问题,其中至少有三分之一是重复的——什么"年假怎么算""差旅费怎么报销""公积金比例是多少"……这些问题的答案其实都写在公司制度里,几个月都不会变一次。
但问题来了:每次有人问,AI都要重新去文档库里翻一遍。
就像你明明已经把家里钥匙放哪儿记得清清楚楚,但每次出门还是要把整个房间翻一遍才能找到。这不是浪费时间吗?
今天这篇文章,我会用实际代码带你完整实现从传统RAG到CAG的演进过程。每一步都有可运行的代码,让你真正理解这个技术是怎么work的。
说到这里,得先聊聊现在最流行的RAG技术。
RAG全称是"检索增强生成",听起来挺学术的,但原理很直白:让AI在回答问题之前,先去知识库里查一查相关资料,然后基于这些资料来生成答案。
这个方法确实解决了AI"瞎编"的问题。但它有个天生的缺陷——没记性。
RAG(检索增强生成)的工作流程很简单:
用户提问
系统去知识库检索相关文档
把检索结果和问题一起给AI
AI基于检索内容生成答案
听起来很美好,但问题在于:每次都要检索。
让我们先实现一个标准的RAG系统,用企业HR知识库作为例子:
import numpy as npfrom typing import List, Dictimport timefrom datetime import datetime# 模拟向量数据库class SimpleVectorDB:"""简单的向量数据库实现"""def __init__(self):self.documents = []self.embeddings = []self.metadata = []def add_document(self, text: str, metadata: Dict = None):"""添加文档到数据库"""# 这里用简单的词频向量模拟embeddingembedding = self._text_to_vector(text)self.documents.append(text)self.embeddings.append(embedding)self.metadata.append(metadata or {})def _text_to_vector(self, text: str) -> np.ndarray:"""将文本转换为向量(简化版)"""# 实际应该用OpenAI/HuggingFace的embedding模型# 这里简化处理:基于字符出现频率vector = np.zeros(100)for i, char in enumerate(text[:100]):vector[i] = ord(char) / 1000return vectordef search(self, query: str, top_k: int = 3) -> List[Dict]:"""检索最相关的文档"""query_vector = self._text_to_vector(query)# 计算余弦相似度similarities = []for i, doc_vector in enumerate(self.embeddings):similarity = np.dot(query_vector, doc_vector) / (np.linalg.norm(query_vector) * np.linalg.norm(doc_vector) + 1e-10)similarities.append({'index': i,'score': similarity,'text': self.documents[i],'metadata': self.metadata[i]})# 返回top_k结果similarities.sort(key=lambda x: x['score'], reverse=True)return similarities[:top_k]class TraditionalRAG:"""传统RAG系统"""def __init__(self):self.vector_db = SimpleVectorDB()self.search_count = 0 # 统计检索次数self.search_times = [] # 记录每次检索耗时def add_knowledge(self, text: str, metadata: Dict = None):"""添加知识到系统"""self.vector_db.add_document(text, metadata)def query(self, question: str) -> Dict:"""处理查询"""start_time = time.time()# 每次都要检索search_results = self.vector_db.search(question, top_k=2)search_time = time.time() - start_timeself.search_count += 1self.search_times.append(search_time)# 组装上下文context = "\n\n".join([r['text'] for r in search_results])# 模拟LLM生成答案(实际应调用GPT/Claude API)answer = self._generate_answer(question, context)return {'question': question,'answer': answer,'context': context,'search_time': search_time,'total_searches': self.search_count}def _generate_answer(self, question: str, context: str) -> str:"""模拟LLM生成答案"""# 实际应该调用OpenAI API或其他LLMreturn f"基于知识库:{context[:100]}... 回答:[模拟答案]"def get_statistics(self) -> Dict:"""获取性能统计"""return {'total_searches': self.search_count,'avg_search_time': np.mean(self.search_times) if self.search_times else 0,'total_time': sum(self.search_times)}# 使用示例def demo_traditional_rag():"""演示传统RAG的问题"""print("=" * 60)print("传统RAG系统演示")print("=" * 60)# 创建RAG系统rag = TraditionalRAG()# 添加企业知识(这些都是稳定的制度文档)knowledge_base = [{"text": "公司年假政策:入职满1年员工享有5天年假,满3年享有10天,满5年享有15天。年假必须在当年使用,不可跨年累积。","metadata": {"category": "HR政策", "update_date": "2024-01-01"}},{"text": "差旅费报销标准:国内出差每天补贴200元,住宿费实报实销上限500元/天。需提供发票和出差申请单。","metadata": {"category": "财务制度", "update_date": "2024-01-01"}},{"text": "公积金缴纳比例:公司和个人各缴纳12%,基数为上年度月平均工资。每年7月调整一次。","metadata": {"category": "薪酬福利", "update_date": "2024-01-01"}},{"text": "病假规定:员工因病需请假,需提供医院证明。病假工资按基本工资的80%发放,每年累计不超过30天。","metadata": {"category": "HR政策", "update_date": "2024-01-01"}}]for kb in knowledge_base:rag.add_knowledge(kb['text'], kb['metadata'])print(f"\n已加载 {len(knowledge_base)} 条企业知识\n")# 模拟重复查询(这是关键问题所在)repeated_questions = ["年假怎么算?","年假政策是什么?","我能休几天年假?","差旅费怎么报销?","出差补贴标准是多少?","年假能累积吗?", # 又问年假"公积金比例是多少?","年假政策详细说明", # 再问年假]print("开始处理查询...\n")for i, question in enumerate(repeated_questions, 1):result = rag.query(question)print(f"查询 {i}: {question}")print(f" 检索耗时: {result['search_time']*1000:.2f}ms")print(f" 累计检索次数: {result['total_searches']}")print()# 显示统计信息stats = rag.get_statistics()print("=" * 60)print("性能统计")print("=" * 60)print(f"总检索次数: {stats['total_searches']}")print(f"平均检索耗时: {stats['avg_search_time']*1000:.2f}ms")print(f"总耗时: {stats['total_time']*1000:.2f}ms")print()print("⚠️ 问题分析:")print(" - 关于'年假'的问题被问了4次,但每次都重新检索")print(" - 这些制度文档几个月都不会变,却要反复访问数据库")print(" - 随着查询量增加,成本和延迟线性上升")print()# 运行演示demo_traditional_rag()
运行上面的代码,你会看到:
关于"年假"的问题问了4次,系统检索了4次
每次检索都要访问向量数据库
累计检索次数随查询量线性增长
实际生产环境的影响:
成本:向量数据库调用费用(如Pinecone按查询次数收费)
延迟:网络往返+相似度计算,通常50-200ms
资源:数据库连接数、CPU占用
通过上面的例子可以很清楚发现,就算是同样的问题问一百遍,AI还是会乖乖地去检索一百遍。访问数据库、匹配文档、提取信息……这一套流程走下来,既耗时又烧钱。
尤其是对于那些几乎不会变的知识,比如公司规章制度、产品说明书、法律条文……每次都重新检索,实在是有点"杀鸡用牛刀"的感觉。
节节这个问题,有个新思路,叫做缓存增强生成(CAG)。
简单说,就是给AI装个"内存"——把那些稳定不变的知识,直接存到模型内部的记忆库里。下次再遇到相关问题,就不用去外面翻箱倒柜了,直接从"脑子里"调出来就行。
这就好比你把常用的工具放在手边,而不是每次都跑到仓库去找。
效果立竿见影:
速度更快:不用反复访问数据库,响应时间能缩短一大半
成本更低:检索次数少了,服务器压力小了,钱自然省下来了
回答更稳定:对于固定知识的表述更一致,不会今天说A明天说B
CAG(缓存增强生成)要做的事情很简单:
识别哪些知识是"静态的"(长期不变)
把这些知识直接缓存到内存
查询时先查缓存,命中就不用检索了
那是不是所有知识都该塞进缓存呢?
当然不是。如果什么都往里装,很快就会把AI的"脑容量"撑爆。
import hashlibfrom typing import Optional, Tupleimport jsonclass KnowledgeCache:"""知识缓存管理器"""def __init__(self, max_size: int = 100):self.cache = {} # 缓存存储self.max_size = max_sizeself.hit_count = 0 # 命中次数self.miss_count = 0 # 未命中次数self.access_log = [] # 访问日志def _generate_key(self, query: str) -> str:"""生成查询的缓存键"""# 使用语义哈希(这里简化为文本哈希)# 实际应该用embedding的相似度匹配normalized = query.lower().strip()return hashlib.md5(normalized.encode()).hexdigest()[:16]def get(self, query: str, similarity_threshold: float = 0.85) -> Optional[Dict]:"""从缓存获取答案"""# 简化版:精确匹配# 实际应该用语义相似度匹配query_key = query.lower().strip()# 查找语义相似的缓存项for cached_query, cached_data in self.cache.items():if self._is_similar(query_key, cached_query):self.hit_count += 1self.access_log.append({'query': query,'result': 'HIT','timestamp': datetime.now().isoformat()})return cached_dataself.miss_count += 1self.access_log.append({'query': query,'result': 'MISS','timestamp': datetime.now().isoformat()})return Nonedef _is_similar(self, query1: str, query2: str) -> bool:"""判断两个查询是否相似"""# 简化版:包含关键词就算相似# 实际应该用向量相似度keywords1 = set(query1.split())keywords2 = set(query2.split())if not keywords1 or not keywords2:return Falseintersection = keywords1 & keywords2union = keywords1 | keywords2similarity = len(intersection) / len(union)return similarity > 0.5def set(self, query: str, context: str, answer: str, metadata: Dict = None):"""设置缓存"""query_key = query.lower().strip()# 检查容量限制if len(self.cache) >= self.max_size:# 简单的LRU:删除最旧的项oldest_key = next(iter(self.cache))del self.cache[oldest_key]self.cache[query_key] = {'query': query,'context': context,'answer': answer,'metadata': metadata or {},'cached_at': datetime.now().isoformat()}def get_statistics(self) -> Dict:"""获取缓存统计"""total_access = self.hit_count + self.miss_counthit_rate = self.hit_count / total_access if total_access > 0 else 0return {'hit_count': self.hit_count,'miss_count': self.miss_count,'total_access': total_access,'hit_rate': hit_rate,'cache_size': len(self.cache)}class CAGSystem:"""CAG(缓存增强生成)系统"""def __init__(self, cache_size: int = 100):self.vector_db = SimpleVectorDB()self.cache = KnowledgeCache(max_size=cache_size)self.search_count = 0self.search_times = []self.cache_hit_times = []def add_knowledge(self, text: str, metadata: Dict = None, cacheable: bool = False):"""添加知识"""self.vector_db.add_document(text, metadata)# 如果标记为可缓存,预先生成常见问题的缓存if cacheable and metadata and 'common_questions' in metadata:for question in metadata['common_questions']:# 预先缓存答案answer = f"基于缓存:{text[:100]}..."self.cache.set(question, text, answer, metadata)def query(self, question: str) -> Dict:"""处理查询(带缓存)"""start_time = time.time()# 先查缓存cached_result = self.cache.get(question)if cached_result:# 缓存命中!cache_time = time.time() - start_timeself.cache_hit_times.append(cache_time)return {'question': question,'answer': cached_result['answer'],'context': cached_result['context'],'source': 'CACHE','response_time': cache_time,'total_searches': self.search_count}# 缓存未命中,执行检索search_results = self.vector_db.search(question, top_k=2)search_time = time.time() - start_timeself.search_count += 1self.search_times.append(search_time)# 组装上下文context = "\n\n".join([r['text'] for r in search_results])answer = self._generate_answer(question, context)# 存入缓存(如果是静态知识)if search_results and self._is_cacheable(search_results[0]):self.cache.set(question, context, answer,search_results[0].get('metadata', {}))return {'question': question,'answer': answer,'context': context,'source': 'RETRIEVAL','response_time': search_time,'total_searches': self.search_count}def _is_cacheable(self, search_result: Dict) -> bool:"""判断检索结果是否应该缓存"""metadata = search_result.get('metadata', {})# 如果有更新日期且超过30天未更新,认为是静态知识update_date = metadata.get('update_date')if update_date:# 简化判断:只要有update_date就认为是静态的return Truereturn Falsedef _generate_answer(self, question: str, context: str) -> str:"""模拟LLM生成答案"""return f"基于知识库:{context[:100]}... 回答:[模拟答案]"def get_statistics(self) -> Dict:"""获取完整统计信息"""cache_stats = self.cache.get_statistics()return {'retrieval': {'total_searches': self.search_count,'avg_search_time': np.mean(self.search_times) if self.search_times else 0,'total_time': sum(self.search_times)},'cache': {'hit_count': cache_stats['hit_count'],'miss_count': cache_stats['miss_count'],'hit_rate': cache_stats['hit_rate'],'avg_hit_time': np.mean(self.cache_hit_times) if self.cache_hit_times else 0,'cache_size': cache_stats['cache_size']},'overall': {'total_queries': cache_stats['total_access'],'searches_saved': cache_stats['hit_count'],'cost_reduction': f"{cache_stats['hit_rate']*100:.1f}%"}}# 使用示例def demo_cag_system():"""演示CAG系统的优势"""print("=" * 60)print("CAG系统演示(带缓存优化)")print("=" * 60)# 创建CAG系统cag = CAGSystem(cache_size=50)# 添加知识(标记静态知识为可缓存)knowledge_base = [{"text": "公司年假政策:入职满1年员工享有5天年假,满3年享有10天,满5年享有15天。年假必须在当年使用,不可跨年累积。","metadata": {"category": "HR政策","update_date": "2024-01-01","common_questions": ["年假怎么算","年假政策是什么","我能休几天年假","年假能累积吗"]},"cacheable": True},{"text": "差旅费报销标准:国内出差每天补贴200元,住宿费实报实销上限500元/天。需提供发票和出差申请单。","metadata": {"category": "财务制度","update_date": "2024-01-01","common_questions": ["差旅费怎么报销","出差补贴标准是多少","出差住宿费报销"]},"cacheable": True},{"text": "公积金缴纳比例:公司和个人各缴纳12%,基数为上年度月平均工资。每年7月调整一次。","metadata": {"category": "薪酬福利","update_date": "2024-01-01","common_questions": ["公积金比例是多少","公积金怎么缴纳"]},"cacheable": True}]for kb in knowledge_base:cag.add_knowledge(kb['text'], kb['metadata'], kb['cacheable'])print(f"\n已加载 {len(knowledge_base)} 条企业知识(已预缓存常见问题)\n")# 模拟重复查询test_questions = ["年假怎么算?", # 第1次:缓存命中"年假政策是什么?", # 第2次:缓存命中"我能休几天年假?", # 第3次:缓存命中"差旅费怎么报销?", # 第1次:缓存命中"出差补贴标准是多少?", # 第2次:缓存命中"年假能累积吗?", # 第4次:缓存命中"公积金比例是多少?", # 第1次:缓存命中"年假政策详细说明", # 第5次:缓存命中]print("开始处理查询...\n")for i, question in enumerate(test_questions, 1):result = cag.query(question)# 显示结果source_icon = "⚡ [缓存]" if result['source'] == 'CACHE' else "🔍 [检索]"print(f"查询 {i}: {question}")print(f" 数据源: {source_icon}")print(f" 响应时间: {result['response_time']*1000:.2f}ms")print(f" 累计检索次数: {result['total_searches']}")print()# 显示详细统计stats = cag.get_statistics()print("=" * 60)print("性能统计对比")print("=" * 60)print("\n【检索统计】")print(f" 实际检索次数: {stats['retrieval']['total_searches']}")print(f" 平均检索耗时: {stats['retrieval']['avg_search_time']*1000:.2f}ms")print("\n【缓存统计】")print(f" 缓存命中次数: {stats['cache']['hit_count']}")print(f" 缓存未命中: {stats['cache']['miss_count']}")print(f" 缓存命中率: {stats['cache']['hit_rate']*100:.1f}%")print(f" 平均缓存响应: {stats['cache']['avg_hit_time']*1000:.2f}ms")print(f" 当前缓存大小: {stats['cache']['cache_size']}")print("\n【整体优化】")print(f" 总查询次数: {stats['overall']['total_queries']}")print(f" 节省检索次数: {stats['overall']['searches_saved']}")print(f" 成本降低: {stats['overall']['cost_reduction']}")print("\n✅ 优势总结:")print(" - 重复问题直接从缓存返回,无需检索")print(" - 响应时间从 50-200ms 降低到 <5ms")print(" - 数据库访问次数大幅减少,成本显著降低")print()# 运行CAG演示demo_cag_system()
让我们直接对比两个系统:
def compare_rag_vs_cag():"""直接对比RAG和CAG的性能"""print("=" * 60)print("RAG vs CAG 性能对比实验")print("=" * 60)# 准备测试数据knowledge = {"text": "公司年假政策:入职满1年员工享有5天年假,满3年享有10天,满5年享有15天。","metadata": {"category": "HR政策","common_questions": ["年假怎么算", "年假政策", "休几天年假"]}}# 重复查询100次questions = ["年假怎么算?"] * 100# 测试传统RAGprint("\n【测试1:传统RAG】")rag = TraditionalRAG()rag.add_knowledge(knowledge['text'], knowledge['metadata'])rag_start = time.time()for q in questions:rag.query(q)rag_total_time = time.time() - rag_startrag_stats = rag.get_statistics()print(f"总耗时: {rag_total_time*1000:.2f}ms")print(f"检索次数: {rag_stats['total_searches']}")print(f"平均延迟: {rag_stats['avg_search_time']*1000:.2f}ms")# 测试CAGprint("\n【测试2:CAG系统】")cag = CAGSystem()cag.add_knowledge(knowledge['text'], knowledge['metadata'], cacheable=True)cag_start = time.time()for q in questions:cag.query(q)cag_total_time = time.time() - cag_startcag_stats = cag.get_statistics()print(f"总耗时: {cag_total_time*1000:.2f}ms")print(f"检索次数: {cag_stats['retrieval']['total_searches']}")print(f"缓存命中率: {cag_stats['cache']['hit_rate']*100:.1f}%")print(f"平均延迟: {cag_stats['cache']['avg_hit_time']*1000:.2f}ms")# 性能提升计算print("\n" + "=" * 60)print("性能提升")print("=" * 60)speedup = rag_total_time / cag_total_timesearch_reduction = (rag_stats['total_searches'] - cag_stats['retrieval']['total_searches']) / rag_stats['total_searches']print(f"速度提升: {speedup:.1f}x")print(f"检索次数减少: {search_reduction*100:.1f}%")print(f"成本节约: ~{search_reduction*100:.1f}%")# 运行对比测试compare_rag_vs_cag()
就像你的大脑:九九乘法表、家庭住址这些早就记住了,但今天午饭吃什么、明天天气怎么样,还是得现查。
这种"内存+外脑"的双引擎模式,才是未来知识型AI的标配。
class HybridRAGCAG:"""混合RAG+CAG系统"""def __init__(self, cache_size: int = 100):# 静态知识缓存self.static_cache = KnowledgeCache(max_size=cache_size)# 动态知识向量库self.dynamic_db = SimpleVectorDB()# 静态知识向量库(用于缓存未命中时的后备)self.static_db = SimpleVectorDB()# 统计信息self.stats = {'static_cache_hits': 0,'static_db_queries': 0,'dynamic_db_queries': 0,'total_queries': 0}def add_static_knowledge(self, text: str, metadata: Dict = None,common_questions: List[str] = None):"""添加静态知识(长期不变)"""# 添加到静态数据库self.static_db.add_document(text, metadata)# 预缓存常见问题if common_questions:for question in common_questions:answer = f"[静态知识] {text}"self.static_cache.set(question, text, answer, metadata)def add_dynamic_knowledge(self, text: str, metadata: Dict = None):"""添加动态知识(经常更新)"""# 只添加到动态数据库,不缓存self.dynamic_db.add_document(text, metadata)def query(self, question: str, require_realtime: bool = False) -> Dict:"""智能查询:自动判断用缓存还是检索Args:question: 用户问题require_realtime: 是否强制要求实时数据"""self.stats['total_queries'] += 1start_time = time.time()# 如果不要求实时数据,先查静态缓存if not require_realtime:cached_result = self.static_cache.get(question)if cached_result:self.stats['static_cache_hits'] += 1return {'question': question,'answer': cached_result['answer'],'source': 'STATIC_CACHE','response_time': time.time() - start_time,'confidence': 'high'}# 判断问题类型:需要动态数据还是静态数据?question_type = self._classify_question(question)if question_type == 'dynamic' or require_realtime:# 查询动态数据库results = self.dynamic_db.search(question, top_k=2)self.stats['dynamic_db_queries'] += 1source = 'DYNAMIC_RETRIEVAL'else:# 查询静态数据库results = self.static_db.search(question, top_k=2)self.stats['static_db_queries'] += 1source = 'STATIC_RETRIEVAL'# 将结果缓存起来,下次直接用if results:context = results[0]['text']answer = f"[静态知识] {context}"self.static_cache.set(question, context, answer,results[0].get('metadata', {}))# 生成答案context = "\n".join([r['text'] for r in results]) if results else ""answer = self._generate_answer(question, context, source)return {'question': question,'answer': answer,'source': source,'response_time': time.time() - start_time,'confidence': 'high' if results else 'low'}def _classify_question(self, question: str) -> str:"""判断问题需要动态数据还是静态数据"""# 简化版:通过关键词判断dynamic_keywords = ['今天', '最新', '现在', '当前', '实时', '昨天', '最近']static_keywords = ['政策', '制度', '规定', '标准', '流程', '怎么', '如何']question_lower = question.lower()# 包含动态关键词,返回dynamicfor keyword in dynamic_keywords:if keyword in question_lower:return 'dynamic'# 包含静态关键词,返回staticfor keyword in static_keywords:if keyword in question_lower:return 'static'# 默认当作静态return 'static'def _generate_answer(self, question: str, context: str, source: str) -> str:"""生成答案"""if not context:return "抱歉,没有找到相关信息。"return f"基于{source}:{context[:150]}..."def get_statistics(self) -> Dict:"""获取详细统计"""cache_stats = self.static_cache.get_statistics()total_queries = self.stats['total_queries']return {'total_queries': total_queries,'static_cache_hits': self.stats['static_cache_hits'],'static_db_queries': self.stats['static_db_queries'],'dynamic_db_queries': self.stats['dynamic_db_queries'],'cache_hit_rate': cache_stats['hit_rate'],'db_access_rate': (self.stats['static_db_queries'] + self.stats['dynamic_db_queries']) / total_queries if total_queries > 0 else 0}def demo_hybrid_system():"""演示混合系统"""print("=" * 60)print("混合RAG+CAG系统演示")print("=" * 60)# 创建混合系统hybrid = HybridRAGCAG(cache_size=50)# 添加静态知识(制度文档)print("\n【加载静态知识】")static_knowledge = [{"text": "公司年假政策:入职满1年员工享有5天年假,满3年享有10天,满5年享有15天。年假必须在当年使用,不可跨年累积。","common_questions": ["年假怎么算", "年假政策", "休几天年假", "年假能累积吗"]},{"text": "报销流程:提交申请单→部门主管审批→财务审核→财务打款。处理时间约3-5个工作日。","common_questions": ["怎么报销", "报销流程", "报销要多久"]}]for kb in static_knowledge:hybrid.add_static_knowledge(kb['text'],{'type': 'static', 'category': 'policy'},kb['common_questions'])print(f"已加载 {len(static_knowledge)} 条静态知识(已预缓存)")# 添加动态知识(实时数据)print("\n【加载动态知识】")dynamic_knowledge = [{"text": "今天公司食堂菜单:午餐有红烧肉、清蒸鱼、麻婆豆腐。晚餐有宫保鸡丁、酸菜鱼、素炒时蔬。","metadata": {'type': 'dynamic', 'date': '2025-11-05'}},{"text": "本周会议通知:周三下午3点全体会议,周五上午10点部门例会。请提前准备材料。","metadata": {'type': 'dynamic', 'date': '2025-11-05'}}]for kb in dynamic_knowledge:hybrid.add_dynamic_knowledge(kb['text'], kb['metadata'])print(f"已加载 {len(dynamic_knowledge)} 条动态知识")# 测试不同类型的查询print("\n" + "=" * 60)print("开始测试查询")print("=" * 60)test_cases = [{"q": "年假怎么算?", "type": "静态问题(应命中缓存)"},{"q": "年假政策是什么?", "type": "静态问题(应命中缓存)"},{"q": "报销流程是什么?", "type": "静态问题(应命中缓存)"},{"q": "今天食堂吃什么?", "type": "动态问题(应查询动态库)"},{"q": "本周有什么会议?", "type": "动态问题(应查询动态库)"},{"q": "年假能累积吗?", "type": "静态问题(应命中缓存)"},{"q": "今天食堂有什么菜?", "type": "动态问题(应查询动态库)"},]for i, test in enumerate(test_cases, 1):result = hybrid.query(test['q'])# 根据source显示不同图标if result['source'] == 'STATIC_CACHE':icon = "⚡"color = "缓存"elif result['source'] == 'STATIC_RETRIEVAL':icon = "📚"color = "静态库"else:icon = "🔄"color = "动态库"print(f"\n查询 {i}: {test['q']}")print(f" 类型: {test['type']}")print(f" 数据源: {icon} {color}")print(f" 响应时间: {result['response_time']*1000:.2f}ms")# 显示统计print("\n" + "=" * 60)print("系统统计")print("=" * 60)stats = hybrid.get_statistics()print(f"\n总查询次数: {stats['total_queries']}")print(f" ├─ 静态缓存命中: {stats['static_cache_hits']} ({stats['cache_hit_rate']*100:.1f}%)")print(f" ├─ 静态库查询: {stats['static_db_queries']}")print(f" └─ 动态库查询: {stats['dynamic_db_queries']}")print(f"\n数据库访问率: {stats['db_access_rate']*100:.1f}%")print(f"成本节约: ~{(1-stats['db_access_rate'])*100:.1f}%")print("\n✅ 混合架构优势:")print(" - 静态知识走缓存,响应极快")print(" - 动态知识走检索,保证实时性")print(" - 自动判断问题类型,智能路由")print(" - 兼顾速度、成本和准确性")# 运行混合系统演示demo_hybrid_system()
不是所有知识都该缓存。如果乱缓存,会遇到两个问题:
内存爆炸:缓存太多,占用大量内存
命中率低:缓存了不常用的内容,浪费空间
所以需要一套智能缓存策略。
class SmartCache:"""智能缓存系统(基于LFU+LRU)"""def __init__(self, max_size: int = 100, min_access_count: int = 3):self.cache = {}self.access_count = {} # 访问计数self.last_access = {} # 最后访问时间self.max_size = max_sizeself.min_access_count = min_access_count # 最小访问次数才缓存# 候选池:访问次数不够的暂存这里self.candidate_pool = {}self.candidate_access = {}def should_cache(self, key: str) -> bool:"""判断是否应该缓存"""# 如果已经在候选池if key in self.candidate_access:self.candidate_access[key] += 1# 访问次数达到阈值,提升到正式缓存if self.candidate_access[key] >= self.min_access_count:return Trueelse:# 首次访问,加入候选池self.candidate_access[key] = 1return Falsedef set(self, key: str, value: Dict):"""设置缓存(只缓存热数据)"""if not self.should_cache(key):# 暂存到候选池self.candidate_pool[key] = valuereturn False# 达到缓存条件,正式缓存if len(self.cache) >= self.max_size:# 淘汰策略:LFU + LRUself._evict()self.cache[key] = valueself.access_count[key] = self.candidate_access.get(key, 1)self.last_access[key] = time.time()# 从候选池移除if key in self.candidate_pool:del self.candidate_pool[key]return Truedef get(self, key: str) -> Optional[Dict]:"""获取缓存"""if key in self.cache:# 更新访问统计self.access_count[key] += 1self.last_access[key] = time.time()return self.cache[key]# 检查候选池if key in self.candidate_pool:self.candidate_access[key] += 1# 如果访问够多了,提升到正式缓存if self.candidate_access[key] >= self.min_access_count:self.set(key, self.candidate_pool[key])return self.cache[key]return self.candidate_pool[key]return Nonedef _evict(self):"""淘汰缓存项(LFU + LRU组合)"""if not self.cache:return# 找出访问次数最少的项min_count = min(self.access_count.values())candidates = [k for k, v in self.access_count.items() if v == min_count]# 如果有多个,选最久未访问的if len(candidates) > 1:evict_key = min(candidates, key=lambda k: self.last_access[k])else:evict_key = candidates[0]# 删除del self.cache[evict_key]del self.access_count[evict_key]del self.last_access[evict_key]def get_statistics(self) -> Dict:"""获取统计信息"""return {'cache_size': len(self.cache),'candidate_size': len(self.candidate_pool),'total_size': len(self.cache) + len(self.candidate_pool),'avg_access_count': np.mean(list(self.access_count.values())) if self.access_count else 0,'hot_items': sorted([(k, v) for k, v in self.access_count.items()],key=lambda x: x[1],reverse=True)[:5] # 前5个热门项}class SmartCachingSystem:"""带智能缓存的完整系统"""def __init__(self, cache_size: int = 50, min_access: int = 3):self.vector_db = SimpleVectorDB()self.smart_cache = SmartCache(max_size=cache_size, min_access_count=min_access)self.stats = {'total_queries': 0,'cache_hits': 0,'db_queries': 0,'promoted_to_cache': 0 # 从候选池提升到正式缓存的次数}def add_knowledge(self, text: str, metadata: Dict = None):"""添加知识"""self.vector_db.add_document(text, metadata)def query(self, question: str) -> Dict:"""查询"""self.stats['total_queries'] += 1start_time = time.time()# 查缓存cached = self.smart_cache.get(question)if cached and question in self.smart_cache.cache: # 正式缓存命中self.stats['cache_hits'] += 1return {'question': question,'answer': cached['answer'],'source': 'CACHE','response_time': time.time() - start_time}# 检索results = self.vector_db.search(question, top_k=2)self.stats['db_queries'] += 1context = "\n".join([r['text'] for r in results]) if results else ""answer = f"基于检索: {context[:100]}..."# 尝试缓存(智能判断)cached_result = self.smart_cache.set(question, {'answer': answer,'context': context,'metadata': results[0].get('metadata', {}) if results else {}})if cached_result:self.stats['promoted_to_cache'] += 1return {'question': question,'answer': answer,'source': 'RETRIEVAL','response_time': time.time() - start_time,'will_cache': cached_result}def get_statistics(self) -> Dict:"""获取统计"""cache_stats = self.smart_cache.get_statistics()return {'queries': self.stats,'cache': cache_stats,'cache_hit_rate': self.stats['cache_hits'] / self.stats['total_queries'] if self.stats['total_queries'] > 0 else 0}def demo_smart_caching():"""演示智能缓存"""print("=" * 60)print("智能缓存系统演示")print("=" * 60)# 创建系统system = SmartCachingSystem(cache_size=10, min_access=3)# 添加知识knowledge = ["年假政策:入职满1年5天,满3年10天,满5年15天","报销流程:提交申请→审批→财务审核→打款","公积金比例:公司和个人各12%","加班政策:工作日1.5倍,周末2倍,节假日3倍","社保缴纳:养老8%医疗2%失业0.5%"]for kb in knowledge:system.add_knowledge(kb)print(f"\n已加载 {len(knowledge)} 条知识\n")# 模拟真实查询分布(符合二八定律)print("模拟真实查询场景(80%查询集中在20%的问题)\n")# 热门问题(会被频繁查询)hot_questions = ["年假怎么算","怎么报销","公积金比例"]# 冷门问题(偶尔查一次)cold_questions = ["加班怎么算","社保比例","病假政策","迟到扣款","离职流程"]# 生成查询序列(80/20分布)query_sequence = []for _ in range(50):if np.random.random() < 0.8: # 80%概率查热门问题query_sequence.append(np.random.choice(hot_questions))else: # 20%概率查冷门问题query_sequence.append(np.random.choice(cold_questions))# 执行查询print("开始处理50次查询...\n")cache_hits_timeline = []for i, question in enumerate(query_sequence, 1):result = system.query(question)if i <= 10 or i % 10 == 0: # 只显示部分结果source_icon = "⚡" if result['source'] == 'CACHE' else "🔍"cached_tag = " [已提升到缓存]" if result.get('will_cache') else ""print(f"查询{i:2d}: {question:15s} {source_icon} {result['source']}{cached_tag}")# 记录命中率变化stats = system.get_statistics()cache_hits_timeline.append(stats['cache_hit_rate'])# 最终统计print("\n" + "=" * 60)print("最终统计")print("=" * 60)final_stats = system.get_statistics()print(f"\n【查询统计】")print(f" 总查询次数: {final_stats['queries']['total_queries']}")print(f" 缓存命中: {final_stats['queries']['cache_hits']}")print(f" 数据库查询: {final_stats['queries']['db_queries']}")print(f" 提升到缓存: {final_stats['queries']['promoted_to_cache']}")print(f"\n【缓存统计】")print(f" 正式缓存: {final_stats['cache']['cache_size']} 项")print(f" 候选池: {final_stats['cache']['candidate_size']} 项")print(f" 总存储: {final_stats['cache']['total_size']} 项")print(f" 缓存命中率: {final_stats['cache_hit_rate']*100:.1f}%")print(f" 平均访问次数: {final_stats['cache']['avg_access_count']:.1f}")print(f"\n【热门问题Top5】")for i, (question, count) in enumerate(final_stats['cache']['hot_items'], 1):print(f" {i}. {question} - 访问{count}次")print("\n✅ 智能缓存特点:")print(" - 只缓存被多次访问的热门问题(访问≥3次)")print(" - 冷门问题不占用宝贵的缓存空间")print(" - 自动淘汰不常用的缓存项")print(" - 符合真实业务场景的访问分布")# 显示命中率趋势print(f"\n【命中率趋势】前20次查询:")for i in range(0, min(20, len(cache_hits_timeline)), 5):rate = cache_hits_timeline[i]bar = "█" * int(rate * 50)print(f" 查询{i+1:2d}: {bar} {rate*100:.1f}%")# 运行智能缓存演示demo_smart_caching()
静态知识也会更新,比如:
公司政策调整
产品信息变更
法律法规修订
这时需要缓存失效机制。
from datetime import datetime, timedeltaclass CacheWithTTL:"""带过期时间的缓存"""def __init__(self, max_size: int = 100, default_ttl: int = 86400):"""Args:max_size: 最大缓存数量default_ttl: 默认过期时间(秒),默认24小时"""self.cache = {}self.max_size = max_sizeself.default_ttl = default_ttlself.stats = {'hits': 0,'misses': 0,'expires': 0,'invalidations': 0}def set(self, key: str, value: Dict, ttl: Optional[int] = None):"""设置缓存项Args:key: 缓存键value: 缓存值ttl: 过期时间(秒),None则使用默认值"""if len(self.cache) >= self.max_size:self._evict_oldest()expire_at = time.time() + (ttl if ttl is not None else self.default_ttl)self.cache[key] = {'value': value,'expire_at': expire_at,'created_at': time.time(),'version': value.get('metadata', {}).get('version', 1)}def get(self, key: str) -> Optional[Dict]:"""获取缓存项"""if key not in self.cache:self.stats['misses'] += 1return Noneitem = self.cache[key]# 检查是否过期if time.time() > item['expire_at']:self.stats['expires'] += 1del self.cache[key]return Noneself.stats['hits'] += 1return item['value']def invalidate(self, key: str):"""主动失效某个缓存"""if key in self.cache:del self.cache[key]self.stats['invalidations'] += 1def invalidate_by_pattern(self, pattern: str):"""按模式批量失效"""keys_to_delete = [k for k in self.cache.keys() if pattern in k]for key in keys_to_delete:self.invalidate(key)def update_version(self, key: str, new_version: int):"""更新版本号(触发重新缓存)"""if key in self.cache:current_version = self.cache[key]['version']if new_version > current_version:# 版本更新,失效旧缓存self.invalidate(key)def _evict_oldest(self):"""淘汰最旧的项"""if not self.cache:returnoldest_key = min(self.cache.keys(),key=lambda k: self.cache[k]['created_at'])del self.cache[oldest_key]def get_statistics(self) -> Dict:"""获取统计"""total = self.stats['hits'] + self.stats['misses']return {**self.stats,'hit_rate': self.stats['hits'] / total if total > 0 else 0,'cache_size': len(self.cache)}class VersionedKnowledgeBase:"""带版本控制的知识库"""def __init__(self):self.documents = {} # key: doc_id, value: {content, version, metadata}self.cache = CacheWithTTL(max_size=50, default_ttl=3600) # 1小时TTLself.vector_db = SimpleVectorDB()def add_or_update_document(self, doc_id: str, content: str,metadata: Dict = None, version: int = 1):"""添加或更新文档"""# 检查是否是更新is_update = doc_id in self.documentsold_version = self.documents[doc_id]['version'] if is_update else 0# 保存文档self.documents[doc_id] = {'content': content,'version': version,'metadata': metadata or {},'updated_at': datetime.now().isoformat()}# 更新向量数据库self.vector_db.add_document(content, {'doc_id': doc_id,'version': version,**(metadata or {})})# 如果是更新,失效相关缓存if is_update and version > old_version:print(f"📝 文档 {doc_id} 更新: v{old_version} -> v{version}")self.cache.invalidate_by_pattern(doc_id)return Truereturn Falsedef query(self, question: str, doc_id: Optional[str] = None) -> Dict:"""查询(支持版本检查)"""# 构建缓存键cache_key = f"{doc_id}:{question}" if doc_id else question# 查缓存cached = self.cache.get(cache_key)if cached:return {'question': question,'answer': cached['answer'],'source': 'CACHE','version': cached.get('version', 'unknown')}# 检索results = self.vector_db.search(question, top_k=2)if not results:return {'question': question, 'answer': '未找到相关信息', 'source': 'NONE'}# 生成答案context = results[0]['text']result_doc_id = results[0]['metadata'].get('doc_id', 'unknown')result_version = results[0]['metadata'].get('version', 1)answer = f"[v{result_version}] {context}"# 缓存结果self.cache.set(cache_key, {'answer': answer,'context': context,'metadata': {'doc_id': result_doc_id,'version': result_version}}, ttl=3600) # 1小时过期return {'question': question,'answer': answer,'source': 'RETRIEVAL','version': result_version,'doc_id': result_doc_id}def demo_cache_update():"""演示缓存更新机制"""print("=" * 60)print("缓存更新与版本控制演示")print("=" * 60)kb = VersionedKnowledgeBase()# 场景1:初始知识print("\n【场景1:初始加载知识】")kb.add_or_update_document(doc_id="policy_annual_leave",content="年假政策v1:入职满1年5天,满3年10天,满5年15天",metadata={'category': 'HR'},version=1)print("✅ 已添加:年假政策 v1")# 第一次查询print("\n第1次查询:年假怎么算?")result1 = kb.query("年假怎么算")print(f" 来源: {result1['source']}")print(f" 版本: {result1['version']}")print(f" 答案: {result1['answer'][:50]}...")# 第二次查询(应该命中缓存)print("\n第2次查询:年假怎么算?")result2 = kb.query("年假怎么算")print(f" 来源: {result2['source']} ⚡")print(f" 版本: {result2['version']}")# 场景2:政策更新print("\n" + "=" * 60)print("【场景2:政策更新】")print("公司调整年假政策...")kb.add_or_update_document(doc_id="policy_annual_leave",content="年假政策v2:入职满1年7天,满3年12天,满5年20天。新增:满10年25天",metadata={'category': 'HR'},version=2)# 再次查询(缓存已失效,应该返回新版本)print("\n第3次查询:年假怎么算?")result3 = kb.query("年假怎么算")print(f" 来源: {result3['source']}")print(f" 版本: {result3['version']}")print(f" 答案: {result3['answer'][:60]}...")# 场景3:缓存重建print("\n第4次查询:年假怎么算?")result4 = kb.query("年假怎么算")print(f" 来源: {result4['source']} ⚡ (新版本已缓存)")print(f" 版本: {result4['version']}")# 统计print("\n" + "=" * 60)print("缓存统计")print("=" * 60)stats = kb.cache.get_statistics()print(f"缓存命中: {stats['hits']}")print(f"缓存未命中: {stats['misses']}")print(f"缓存失效: {stats['invalidations']}")print(f"命中率: {stats['hit_rate']*100:.1f}%")print("\n✅ 更新机制总结:")print(" - 文档更新时自动失效相关缓存")print(" - 版本号控制确保数据一致性")print(" - 支持TTL自动过期")print(" - 下次查询会获取最新版本并重新缓存")# 运行缓存更新演示demo_cache_update()
def compare_invalidation_strategies():"""对比不同的缓存失效策略"""print("=" * 60)print("三种缓存失效策略对比")print("=" * 60)print("\n【策略1:固定TTL(Time To Live)】")print("特点:设置固定过期时间")print("优点:实现简单,自动清理")print("缺点:可能返回过期数据")print("适用:可以容忍短期延迟的场景")print("\n示例代码:")print("""cache.set('question', answer, ttl=3600) # 1小时后过期""")print("\n【策略2:版本号控制】")print("特点:每次更新增加版本号")print("优点:精确控制,不会返回旧数据")print("缺点:需要维护版本号系统")print("适用:数据一致性要求高的场景")print("\n示例代码:")print("""# 更新文档时doc.version += 1cache.invalidate_by_version(doc.id, doc.version)""")print("\n【策略3:主动推送失效】")print("特点:内容更新时主动通知缓存失效")print("优点:实时性最好")print("缺点:需要额外的通知机制")print("适用:分布式系统、多节点部署")print("\n示例代码:")print("""# 发布更新事件event_bus.publish('document_updated', doc_id='policy_123')# 监听器失效缓存@event_bus.subscribe('document_updated')def on_document_updated(doc_id):cache.invalidate_by_pattern(doc_id)""")# 实际测试对比print("\n" + "=" * 60)print("实际场景测试")print("=" * 60)# 模拟:文档每小时更新一次,查询每分钟一次ttl_configs = [{'name': 'TTL=10分钟', 'ttl': 600, 'update_interval': 3600},{'name': 'TTL=30分钟', 'ttl': 1800, 'update_interval': 3600},{'name': 'TTL=60分钟', 'ttl': 3600, 'update_interval': 3600},]print("\n假设:文档每小时更新,查询每分钟一次(共120次查询)")print("\n不同TTL配置的效果:\n")for config in ttl_configs:ttl = config['ttl']update_interval = config['update_interval']# 计算可能返回过期数据的次数stale_responses = max(0, (update_interval - ttl) / 60) # 分钟freshness_rate = (60 - stale_responses) / 60 * 100print(f"{config['name']}:")print(f" 可能过期的响应: ~{int(stale_responses)}次")print(f" 数据新鲜度: {freshness_rate:.1f}%")print()print("💡 建议:")print(" - 制度类文档:TTL = 24小时 + 版本控制")print(" - 产品信息:TTL = 1小时 + 版本控制")print(" - 实时数据:不缓存或TTL < 5分钟")# 运行对比compare_invalidation_strategies()
import loggingfrom typing import Callablefrom dataclasses import dataclassfrom enum import Enumclass CacheStrategy(Enum):"""缓存策略"""ALWAYS = "always" # 总是缓存SMART = "smart" # 智能判断NEVER = "never" # 从不缓存class CacheConfig:"""缓存配置"""max_size: int = 100default_ttl: int = 3600min_access_count: int = 3strategy: CacheStrategy = CacheStrategy.SMARTenable_metrics: bool = Trueclass ProductionCAGSystem:"""生产级CAG系统"""def __init__(self, config: CacheConfig = None):self.config = config or CacheConfig()# 核心组件self.static_cache = CacheWithTTL(max_size=self.config.max_size,default_ttl=self.config.default_ttl)self.smart_cache = SmartCache(max_size=self.config.max_size,min_access_count=self.config.min_access_count)self.vector_db = SimpleVectorDB()# 监控指标self.metrics = {'total_queries': 0,'cache_hits': 0,'db_queries': 0,'avg_response_time': [],'errors': 0}# 日志self.logger = self._setup_logger()def _setup_logger(self):"""设置日志"""logger = logging.getLogger('CAGSystem')logger.setLevel(logging.INFO)if not logger.handlers:handler = logging.StreamHandler()formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')handler.setFormatter(formatter)logger.addHandler(handler)return loggerdef add_knowledge(self,text: str,doc_id: str,metadata: Dict = None,cache_strategy: CacheStrategy = None,ttl: int = None):"""添加知识"""try:full_metadata = {'doc_id': doc_id,'cache_strategy': (cache_strategy or self.config.strategy).value,'ttl': ttl or self.config.default_ttl,**(metadata or {})}self.vector_db.add_document(text, full_metadata)self.logger.info(f"Added document: {doc_id}")except Exception as e:self.logger.error(f"Error adding document {doc_id}: {str(e)}")self.metrics['errors'] += 1raisedef query(self, question: str, force_refresh: bool = False) -> Dict:"""查询"""start_time = time.time()self.metrics['total_queries'] += 1try:# 强制刷新则跳过缓存if not force_refresh:# 先查静态缓存cached = self.static_cache.get(question)if cached:self.metrics['cache_hits'] += 1response_time = time.time() - start_timeself.metrics['avg_response_time'].append(response_time)self.logger.info(f"Cache hit: {question[:30]}...")return {'question': question,'answer': cached['answer'],'source': 'STATIC_CACHE','response_time': response_time,'cached': True}# 再查智能缓存smart_cached = self.smart_cache.get(question)if smart_cached and question in self.smart_cache.cache:self.metrics['cache_hits'] += 1response_time = time.time() - start_timeself.metrics['avg_response_time'].append(response_time)self.logger.info(f"Smart cache hit: {question[:30]}...")return {'question': question,'answer': smart_cached['answer'],'source': 'SMART_CACHE','response_time': response_time,'cached': True}# 缓存未命中,执行检索self.logger.info(f"Cache miss, retrieving: {question[:30]}...")results = self.vector_db.search(question, top_k=2)self.metrics['db_queries'] += 1if not results:response_time = time.time() - start_timeself.metrics['avg_response_time'].append(response_time)return {'question': question,'answer': '未找到相关信息','source': 'NONE','response_time': response_time,'cached': False}# 生成答案context = results[0]['text']metadata = results[0].get('metadata', {})answer = f"基于检索: {context[:100]}..."# 根据策略决定是否缓存strategy = CacheStrategy(metadata.get('cache_strategy', 'smart'))ttl = metadata.get('ttl', self.config.default_ttl)if strategy == CacheStrategy.ALWAYS:# 直接缓存到静态缓存self.static_cache.set(question, {'answer': answer,'context': context,'metadata': metadata}, ttl=ttl)self.logger.info(f"Cached to static (ALWAYS): {question[:30]}...")elif strategy == CacheStrategy.SMART:# 让智能缓存决定cached = self.smart_cache.set(question, {'answer': answer,'context': context,'metadata': metadata})if cached:self.logger.info(f"Promoted to smart cache: {question[:30]}...")response_time = time.time() - start_timeself.metrics['avg_response_time'].append(response_time)return {'question': question,'answer': answer,'source': 'RETRIEVAL','response_time': response_time,'cached': False,'cache_strategy': strategy.value}except Exception as e:self.logger.error(f"Error processing query '{question}': {str(e)}")self.metrics['errors'] += 1raisedef invalidate_cache(self, pattern: str = None, doc_id: str = None):"""失效缓存"""try:if pattern:self.static_cache.invalidate_by_pattern(pattern)self.logger.info(f"Invalidated cache by pattern: {pattern}")if doc_id:self.static_cache.invalidate_by_pattern(doc_id)self.logger.info(f"Invalidated cache for doc: {doc_id}")except Exception as e:self.logger.error(f"Error invalidating cache: {str(e)}")raisedef get_health_status(self) -> Dict:"""获取系统健康状态"""total_queries = self.metrics['total_queries']cache_hit_rate = self.metrics['cache_hits'] / total_queries if total_queries > 0 else 0avg_response = np.mean(self.metrics['avg_response_time']) if self.metrics['avg_response_time'] else 0# 健康评分health_score = 100if cache_hit_rate < 0.3:health_score -= 20 # 命中率低if avg_response > 0.1:health_score -= 15 # 响应慢if self.metrics['errors'] > 0:health_score -= 30 # 有错误status = 'healthy' if health_score >= 80 else 'degraded' if health_score >= 50 else 'unhealthy'return {'status': status,'health_score': health_score,'metrics': {'total_queries': total_queries,'cache_hit_rate': f"{cache_hit_rate*100:.1f}%",'avg_response_time': f"{avg_response*1000:.2f}ms",'db_queries': self.metrics['db_queries'],'errors': self.metrics['errors']},'cache_info': {'static_cache_size': self.static_cache.get_statistics()['cache_size'],'smart_cache_size': self.smart_cache.get_statistics()['cache_size']}}def export_metrics(self) -> Dict:"""导出指标(用于监控系统)"""static_stats = self.static_cache.get_statistics()smart_stats = self.smart_cache.get_statistics()return {'timestamp': datetime.now().isoformat(),'queries': {'total': self.metrics['total_queries'],'cache_hits': self.metrics['cache_hits'],'db_queries': self.metrics['db_queries'],'errors': self.metrics['errors']},'performance': {'cache_hit_rate': self.metrics['cache_hits'] / self.metrics['total_queries'] if self.metrics['total_queries'] > 0 else 0,'avg_response_time': np.mean(self.metrics['avg_response_time']) if self.metrics['avg_response_time'] else 0,'p95_response_time': np.percentile(self.metrics['avg_response_time'], 95) if len(self.metrics['avg_response_time']) > 0 else 0},'cache': {'static': static_stats,'smart': smart_stats}}def demo_production_system():"""演示生产级系统"""print("=" * 60)print("生产级CAG系统演示")print("=" * 60)# 创建系统(不同配置)config = CacheConfig(max_size=50,default_ttl=3600,min_access_count=2,strategy=CacheStrategy.SMART,enable_metrics=True)system = ProductionCAGSystem(config)print(f"\n系统配置:")print(f" 缓存大小: {config.max_size}")print(f" 默认TTL: {config.default_ttl}秒")print(f" 最小访问次数: {config.min_access_count}")print(f" 缓存策略: {config.strategy.value}")# 添加不同类型的知识print("\n" + "=" * 60)print("添加知识")print("=" * 60)# 1. 静态知识(总是缓存)system.add_knowledge(text="公司年假政策:入职满1年5天,满3年10天,满5年15天",doc_id="policy_001",metadata={'category': '制度', 'type': 'static'},cache_strategy=CacheStrategy.ALWAYS,ttl=86400 # 24小时)print("✅ 添加静态知识: 年假政策(ALWAYS缓存)")# 2. 半静态知识(智能缓存)system.add_knowledge(text="产品价格表:基础版99元/月,专业版199元/月,企业版499元/月",doc_id="product_002",metadata={'category': '产品', 'type': 'semi-static'},cache_strategy=CacheStrategy.SMART,ttl=3600 # 1小时)print("✅ 添加半静态知识: 产品价格(SMART缓存)")# 3. 动态知识(不缓存)system.add_knowledge(text="今日促销:所有产品8折优惠,仅限今天!",doc_id="promo_003",metadata={'category': '促销', 'type': 'dynamic'},cache_strategy=CacheStrategy.NEVER,ttl=300 # 5分钟)print("✅ 添加动态知识: 促销信息(NEVER缓存)")# 模拟真实查询场景print("\n" + "=" * 60)print("模拟真实查询")print("=" * 60)queries = [# 静态问题(高频)("年假怎么算", 5),("年假政策", 3),# 半静态问题(中频)("产品价格", 3),("多少钱", 2),# 动态问题(低频)("今天有优惠吗", 2),("促销活动", 1)]print("\n执行查询...")for question, count in queries:for i in range(count):result = system.query(question)if i == 0: # 只显示首次查询cache_tag = "⚡" if result['cached'] else "🔍"print(f" {cache_tag} {question}: {result['source']} ({result['response_time']*1000:.2f}ms)")# 显示健康状态print("\n" + "=" * 60)print("系统健康状态")print("=" * 60)health = system.get_health_status()status_icon = "✅" if health['status'] == 'healthy' else "⚠️" if health['status'] == 'degraded' else "❌"print(f"\n状态: {status_icon} {health['status'].upper()}")print(f"健康评分: {health['health_score']}/100")print(f"\n指标:")for key, value in health['metrics'].items():print(f" {key}: {value}")print(f"\n缓存信息:")for key, value in health['cache_info'].items():print(f" {key}: {value}")# 导出指标print("\n" + "=" * 60)print("性能指标(可接入Prometheus/Grafana)")print("=" * 60)metrics = system.export_metrics()print(f"\n时间戳: {metrics['timestamp']}")print(f"\n查询统计:")print(f" 总查询: {metrics['queries']['total']}")print(f" 缓存命中: {metrics['queries']['cache_hits']}")print(f" 数据库查询: {metrics['queries']['db_queries']}")print(f" 错误数: {metrics['queries']['errors']}")print(f"\n性能指标:")print(f" 缓存命中率: {metrics['performance']['cache_hit_rate']*100:.1f}%")print(f" 平均响应时间: {metrics['performance']['avg_response_time']*1000:.2f}ms")print(f" P95响应时间: {metrics['performance']['p95_response_time']*1000:.2f}ms")# 最佳实践总结print("\n" + "=" * 60)print("生产环境最佳实践")print("=" * 60)print("""1. 【分层缓存策略】- 静态知识:ALWAYS + 长TTL(24小时)- 半静态知识:SMART + 中TTL(1小时)- 动态知识:NEVER 或 短TTL(5分钟)2. 【监控指标】- 缓存命中率:目标 >50%- 平均响应时间:目标 <50ms- P95响应时间:目标 <100ms- 错误率:目标 <0.1%3. 【容量规划】- 缓存大小 = 日查询量 × 0.2(二八定律)- 预留20%扩展空间- 设置告警阈值:命中率<30%、响应>100ms4. 【失效策略】- 定时失效:使用TTL- 主动失效:文档更新时触发- 批量失效:支持按模式匹配5. 【高可用保障】- 缓存失败降级到检索- 异常捕获和日志记录- 健康检查接口- 指标导出到监控系统""")# 运行生产系统演示demo_production_system()
现在让我们把所有代码整合到一起,提供一个完整可运行的demo:
def run_complete_demo():"""运行完整演示"""print("\n\n")print("="* 80)print(" " * 20 + "CAG完整演示:从RAG到生产级CAG")print("=" * 80)print("\n这个演示将展示:")print(" 1. 传统RAG的性能问题")print(" 2. CAG如何解决这些问题")print(" 3. RAG+CAG混合架构")print(" 4. 智能缓存策略")print(" 5. 缓存更新机制")print(" 6. 生产级系统实现")print("\n" + "=" * 80)input("按回车键开始演示...")# 依次运行各个演示demos = [("传统RAG系统", demo_traditional_rag),("CAG系统", demo_cag_system),("RAG vs CAG性能对比", compare_rag_vs_cag),("混合RAG+CAG系统", demo_hybrid_system),("智能缓存", demo_smart_caching),("缓存更新机制", demo_cache_update),("缓存失效策略对比", compare_invalidation_strategies),("生产级系统", demo_production_system),]for i, (name, demo_func) in enumerate(demos, 1):print(f"\n\n{'='*80}")print(f"演示 {i}/{len(demos)}: {name}")print("="*80)input("按回车继续...")demo_func()print("\n演示完成!")if i < len(demos):input("按回车进入下一个演示...")print("\n\n" + "="*80)print(" " * 30 + "所有演示完成!")print("="*80)print("\n📚 你已经学会了:")print(" ✅ RAG的基本原理和问题")print(" ✅ CAG如何通过缓存提升性能")print(" ✅ 如何设计混合架构")print(" ✅ 智能缓存策略的实现")print(" ✅ 缓存更新和失效机制")print(" ✅ 生产级系统的完整实现")print("\n💡 下一步:")print(" - 在自己的项目中应用这些技术")print(" - 根据实际场景调整缓存策略")print(" - 接入监控系统持续优化")print(" - 考虑分布式缓存(Redis等)")# 如果直接运行此文件,执行完整演示if __name__ == "__main__":run_complete_demo()
以前的AI是"现学现卖",每次都要临时抱佛脚。
而CAG让AI有了真正的"记忆力",能把核心知识牢牢记住,需要的时候随时调取。
这不是简单的技术升级,而是让AI从"查询工具"向"智能助手"的本质跃迁。
想象一下:未来你的AI助手不仅知道去哪儿查信息,更重要的是,它能记住你的习惯、你的偏好、你们之间的每一次对话……
那时候,它才真正成为了你的"数字分身"。
而这一切的起点,就是从让AI学会"记忆"开始。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-11-06
Zero-RAG,对冗余知识说“不”
2025-11-06
RFT目前(在应用层)仍然是被低估的
2025-11-05
从 RAG 到 Agentic RAG,再到 Agent Memory:AI 记忆的进化三部曲
2025-11-05
万字详解Naive RAG超进化之路:Pre-Retrieval和Retrieval优化
2025-11-05
别只调模型!RAG 检索优化真正该测的,是这三件事
2025-11-04
大模型生态的“不可能三角”:规模化应用的架构困境?
2025-10-31
Dify知识库从Demo到生产:RAG构建企业级私有知识库的7个关键步骤
2025-10-31
RAGFlow 深度介绍
2025-09-15
2025-09-02
2025-08-18
2025-08-25
2025-08-25
2025-08-25
2025-09-03
2025-09-08
2025-08-20
2025-08-28
2025-11-04
2025-10-04
2025-09-30
2025-09-10
2025-09-10
2025-09-03
2025-08-28
2025-08-25