微信扫码
添加专属顾问
我要投稿
告别繁琐工具链!RAG-Anything × Milvus一站式解决PDF多模态解析难题,让知识库构建效率提升数倍。 核心内容: 1. 传统RAG处理PDF的痛点与集成难题 2. RAG-Anything的"1+3+N"创新架构解析 3. 并行处理与分层存储带来的性能突破
AI落地主流场景之一是知识库,而做知识库,必定少不了PDF文件。
传统RAG要想精准读取这些图文并茂的PDF,就需要集成PyPDF2、OpenCV、Camelot、Tesseract等多个工具,系统庞杂且低效。此外,不同 PDF 各有侧重:报告重图表、财报重表格、论文重公式,如何精准调用这些工具同样难度不低。
香港大学数据科学学院刚刚开源的RAG-Anything项目,结合开源的Milvus向量数据库,让我们逐渐看到了解决这个问题的曙光。
通过将所有功能封装在一个框架内,并引入VLM增强查询机制,RAG-Anything可以在一个框架内实现分析文本、自动理解图像、表格等多模态内容,提供更全面的答案。
下面,我们将从架构以及实验案例出发,解释为什么它能处理文本、图像、表格与公式,并且在性能上站得住脚。
RAG-Anything的核心架构可以用"1+3+N"来概括:
1个核心引擎:基于LightRAG的知识图谱构建引擎,负责实体关系抽取和向量化存储。这个引擎的特别之处在于,它不仅处理文本实体,还能理解图像中的对象、表格中的数据关系。
3大模态处理器:
ImageModalProcessor:除了能做简单的图像识别,还能深度理解图像内容与上下文的关系
TableModalProcessor:智能解析表格结构,理解数据之间的逻辑关系
EquationModalProcessor:简单的符号识别基础上,能做到对数学公式的语义理解
N种解析器:支持MinerU和Docling两大解析引擎,可以根据文档类型自动选择最优解析策略。
在"1+3+N"技术架构的基础上,RAG-Anything实现了处理机制的性能突破。把传统传统RAG系统的串行处理方式(先解析文本,再处理图像,最后处理表格)升级成为了并行处理。
# 核心配置展示了并行处理的设计思路config = RAGAnythingConfig( working_dir="./rag_storage", parser="mineru", parse_method="auto", # 自动选择最优解析策略 enable_image_processing=True, enable_table_processing=True, enable_equation_processing=True, max_workers=8 # 支持多线程并行处理)
这种并行架构带来显著效果:处理大型技术文档的速度有了明显提升。测试表明,随着CPU核心数增加,系统处理能力几乎呈线性提升,大幅缩短了文档处理时间。
除此之外,RAG-Anything还采用了分层存储与检索优化
文本内容存储在传统向量数据库中
图像特征使用专门的视觉向量存储
表格数据采用结构化存储
公式采用语义向量化存储
这种分层设计的优势在于,系统可以为不同类型的查询使用最适合的检索策略,而非简单地采用一刀切的向量相似度搜索。
原理与架构已经讲清楚了。下面我们用一个最小可运行的示例,在 5 分钟内把文本与图像检索问答跑通。
本实验展示如何基于RAG-Anything框架集成Milvus向量数据库以及阿里通义大模型,实现支持文本和图像处理的多模态问答系统。(展示核心代码实现部分,并非完整代码)
Milvus的核心优势在于其存算分离的云原生架构,带来极致的弹性伸缩能力和成本效益。通过读写分离与流批一体设计,它在保证高并发性能的同时,还能实现插入即可查的实时性能。此外,无单点故障的设计确保了企业级的高可用与高可靠性。
环境准备
Python 环境:Python 3.10+
向量数据库:Milvus 服务(MilvusLite)
云服务:阿里云 API 密钥(LLM + 嵌入服务)
LLM模型:qwen-vl-max(视觉)
Embedding模型:tongyi-embedding-vision-plus
- python -m venv .venv && source .venv/bin/activate # Windows 使用 .venvScriptsactivate- pip install -r requirements-min.txt- cp .env.example .env 并填写 DASHSCOPE_API_KEY
运行
python minimal_[main.py](<http://main.py>)
预期结果
终端打印文本问答答案与图像检索命中描述
.├─ requirements-min.txt├─ .env.example├─ [config.py](<http://config.py>)├─ milvus_[store.py](<http://store.py>)├─ [adapters.py](<http://adapters.py>)├─ minimal_[main.py](<http://main.py>)└─ sample ├─ docs │ └─ faq_milvus.txt └─ images └─ milvus_arch.png
raganythinglightragpymilvus[lite]>=2.3.0aiohttp>=3.8.0orjson>=3.8.0python-dotenv>=1.0.0Pillow>=9.0.0numpy>=1.21.0,<2.0.0rich>=12.0.0
# 阿里云 DashScopeDASHSCOPE_API_KEY=your_api_key_here# 端点如官方变更,请按需替换ALIYUN_LLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completionsALIYUN_VLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completionsALIYUN_EMBED_URL=https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding# 模型名(统一处配置)LLM_TEXT_MODEL=qwen-maxLLM_VLM_MODEL=qwen-vl-maxEMBED_MODEL=tongyi-embedding-vision-plus# Milvus LiteMILVUS_URI=milvus_lite.dbMILVUS_COLLECTION=rag_multimodal_collectionEMBED_DIM=1152
import osfrom dotenv import load_dotenvload_dotenv()DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")LLM_TEXT_MODEL = os.getenv("LLM_TEXT_MODEL", "qwen-max")LLM_VLM_MODEL = os.getenv("LLM_VLM_MODEL", "qwen-vl-max")EMBED_MODEL = os.getenv("EMBED_MODEL", "tongyi-embedding-vision-plus")ALIYUN_LLM_URL = os.getenv("ALIYUN_LLM_URL")ALIYUN_VLM_URL = os.getenv("ALIYUN_VLM_URL")ALIYUN_EMBED_URL = os.getenv("ALIYUN_EMBED_URL")MILVUS_URI = os.getenv("MILVUS_URI", "milvus_lite.db")MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION", "rag_multimodal_collection")EMBED_DIM = int(os.getenv("EMBED_DIM", "1152"))# 基础运行参数TIMEOUT = 60MAX_RETRIES = 2import osimport base64import aiohttpimport asynciofrom typing import List, Dict, Any, Optionalfrom config import ( DASHSCOPE_API_KEY, LLM_TEXT_MODEL, LLM_VLM_MODEL, EMBED_MODEL, ALIYUN_LLM_URL, ALIYUN_VLM_URL, ALIYUN_EMBED_URL, EMBED_DIM, TIMEOUT)HEADERS = { "Authorization": f"Bearer {DASHSCOPE_API_KEY}", "Content-Type": "application/json",}class AliyunLLMAdapter: def __init__(self): self.text_url = ALIYUN_LLM_URL self.vlm_url = ALIYUN_VLM_URL self.text_model = LLM_TEXT_MODEL self.vlm_model = LLM_VLM_MODEL async def chat(self, prompt: str) -> str: payload = { "model": self.text_model, "input": {"messages": [{"role": "user", "content": prompt}]}, "parameters": {"max_tokens": 1024, "temperature": 0.5}, } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s: async with [s.post](<http://s.post>)(self.text_url, json=payload, headers=HEADERS) as r: r.raise_for_status() data = await r.json() return data["output"]["choices"][0]["message"]["content"] async def chat_vlm_with_image(self, prompt: str, image_path: str) -> str: with open(image_path, "rb") as f: image_b64 = base64.b64encode([f.read](<http://f.read>)()).decode("utf-8") payload = { "model": self.vlm_model, "input": {"messages": [{"role": "user", "content": [ {"text": prompt}, {"image": f"data:image/png;base64,{image_b64}"} ]}]}, "parameters": {"max_tokens": 1024, "temperature": 0.2}, } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s: async with [s.post](<http://s.post>)(self.vlm_url, json=payload, headers=HEADERS) as r: r.raise_for_status() data = await r.json() return data["output"]["choices"][0]["message"]["content"]class AliyunEmbeddingAdapter: def __init__(self): self.url = ALIYUN_EMBED_URL self.model = EMBED_MODEL self.dim = EMBED_DIM async def embed_text(self, text: str) -> List[float]: payload = { "model": self.model, "input": {"texts": [text]}, "parameters": {"text_type": "query", "dimensions": self.dim}, } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s: async with [s.post](<http://s.post>)(self.url, json=payload, headers=HEADERS) as r: r.raise_for_status() data = await r.json() return data["output"]["embeddings"][0]["embedding"]import jsonimport timefrom typing import List, Dict, Any, Optionalfrom pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utilityfrom config import MILVUS_URI, MILVUS_COLLECTION, EMBED_DIMclass MilvusVectorStore: def __init__(self, uri: str = MILVUS_URI, collection_name: str = MILVUS_COLLECTION, dim: int = EMBED_DIM): self.uri = uri self.collection_name = collection_name self.dim = dim self.collection: Optional[Collection] = None self._connect_and_prepare() def _connect_and_prepare(self): connections.connect("default", uri=self.uri) if utility.has_collection(self.collection_name): self.collection = Collection(self.collection_name) else: fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=512, is_primary=True), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.dim), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=32), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="ts", dtype=[DataType.INT](<http://DataType.INT>)64), ] schema = CollectionSchema(fields, "Minimal multimodal collection") self.collection = Collection(self.collection_name, schema) self.collection.create_index("vector", { "metric_type": "COSINE", "index_type": "IVF_FLAT", "params": {"nlist": 1024} }) self.collection.load() def upsert(self, ids: List[str], vectors: List[List[float]], contents: List[str], content_types: List[str], sources: List[str]) -> None: data = [ ids, vectors, contents, content_types, sources, [int(time.time() * 1000)] * len(ids) ] self.collection.upsert(data) self.collection.flush() def search(self, query_vectors: List[List[float]], top_k: int = 5, content_type: Optional[str] = None): expr = f'content_type == "{content_type}"' if content_type else None params = {"metric_type": "COSINE", "params": {"nprobe": 16}} results = [self.collection.search](<http://self.collection.search>)( data=query_vectors, anns_field="vector", param=params, limit=top_k, expr=expr, output_fields=["id", "content", "content_type", "source", "ts"] ) out = [] for hits in results: out.append([{ "id": h.entity.get("id"), "content": h.entity.get("content"), "content_type": h.entity.get("content_type"), "source": h.entity.get("source"), "score": h.score } for h in hits]) return out"""最小可运行示例:- 将一段文本 FAQ 写入 LightRAG(仅作文本语境)- 将一张图片描述向量写入 Milvus(图像检索语境)- 执行两条查询:文本问答、图像问答"""import asyncioimport uuidfrom pathlib import Pathfrom rich import printfrom lightrag import LightRAG, QueryParamfrom lightrag.utils import EmbeddingFuncfrom adapters import AliyunLLMAdapter, AliyunEmbeddingAdapterfrom milvus_store import MilvusVectorStorefrom config import EMBED_DIMSAMPLE_DOC = Path("sample/docs/faq_milvus.txt")SAMPLE_IMG = Path("sample/images/milvus_arch.png")async def main(): # 1) 初始化组件 llm = AliyunLLMAdapter() emb = AliyunEmbeddingAdapter() store = MilvusVectorStore() # 2) 初始化 LightRAG(仅文本检索) async def llm_complete(prompt: str, max_tokens: int = 1024) -> str: return await [llm.chat](<http://llm.chat>)(prompt) async def embed_func(text: str) -> list: return await emb.embed_text(text) rag = LightRAG( working_dir="rag_workdir_min", llm_model_func=llm_complete, embedding_func=EmbeddingFunc( embedding_dim=EMBED_DIM, max_token_size=8192, func=embed_func ), ) # 3) 数据插入:文本 if SAMPLE_DOC.exists(): text = SAMPLE_[DOC.read](<http://DOC.read>)_text(encoding="utf-8") await rag.ainsert(text) print("[green]已插入文本 FAQ 到 LightRAG[/green]") else: print("[yellow]未找到 sample/docs/faq_milvus.txt[/yellow]") # 4) 数据插入:图像(描述存 Milvus) if SAMPLE_IMG.exists(): # 用 VLM 生成该图片的简要描述,作为图像语义内容 desc = await [llm.chat](<http://llm.chat>)_vlm_with_image("请简要描述图中的 Milvus 架构要点。", str(SAMPLE_IMG)) vec = await emb.embed_text(desc) # 采用文本嵌入统一维度,便于最小示例复用 store.upsert( ids=[str(uuid.uuid4())], vectors=[vec], contents=[desc], content_types=["image"], sources=[str(SAMPLE_IMG)] ) print("[green]已插入图像描述到 Milvus(content_type=image)[/green]") else: print("[yellow]未找到 sample/images/milvus_arch.png[/yellow]") # 5) 查询:文本问答(从 LightRAG) q1 = "Milvus 是否支持同时插入与搜索?请给出简短回答。" ans1 = await rag.aquery(q1, param=QueryParam(mode="hybrid")) print("\\n[bold]文本问答[/bold]") print(ans1) # 6) 查询:图像相关(从 Milvus) q2 = "Milvus 架构的关键组件有哪些?" q2_vec = await emb.embed_text(q2) img_hits = [store.search](<http://store.search>)([q2_vec], top_k=3, content_type="image") print("\\n[bold]图像检索(返回图像语义描述)[/bold]") print(img_hits[0] if img_hits else [])if __name__ == "__main__": [asyncio.run](<http://asyncio.run>)(main())Milvus 的成本是多少?Milvus 是一个 100% 免费的开源项目。在使用 Milvus 进行生产或发布时,请遵守Apache License 2.0。Milvus 背后的公司 Zilliz 还为那些不想构建和维护自己的分布式实例的用户提供完全托管的云版平台。Zilliz Cloud可自动维护数据的可靠性,并允许用户只为其使用付费。Milvus 支持非 x86 架构吗?Milvus不能在非x86平台上安装或运行。您的 CPU 必须支持以下指令集之一才能运行 Milvus:SSE4.2、AVX、AVX2、AVX512。这些都是 x86 专用 SIMD 指令集。Milvus 在哪里存储数据?Milvus 处理两种类型的数据:插入数据和元数据。插入数据(包括向量数据、标量数据和特定于 Collections 的 Schema)以增量日志的形式存储在持久存储中。Milvus 支持多种对象存储后端,包括MinIO、AWS S3、谷歌云存储(GCS)、Azure Blob 存储、阿里云 OSS 和腾讯云对象存储(COS)。元数据在 Milvus 内部生成。每个 Milvus 模块都有自己的元数据,这些元数据存储在 etcd 中。为什么 etcd 中没有向量数据?etcd 存储 Milvus 模块元数据;MinIO 存储实体。Milvus 支持同时插入和搜索数据吗?是的。插入操作和查询操作由两个相互独立的模块处理。从客户端的角度来看,当插入的数据进入消息队列时,插入操作符就完成了。但是,插入的数据在加载到查询节点之前是不可查询的。对于具有增量数据的增长数据段,Milvus 会自动建立临时索引,以确保高效的搜索性能,即使数据段大小未达到索引建立阈值(计算公式为dataCoord.segment.maxSize ×dataCoord.segment.sealProportion )。你可以通过Milvus 配置文件中的配置参数queryNode.segcore.interimIndex.enableIndex 来控制这种行为--将其设置为true 可启用临时索引(默认),而将其设置为false 则会禁用临时索引。能否在 Milvus 中插入主键重复的向量?可以。Milvus 不检查向量主键是否重复。当插入主键重复的向量时,Milvus 是否将其视为更新操作?Milvus目前不支持更新操作,也不检查实体主键是否重复。你有责任确保实体主键是唯一的,如果不是唯一的,Milvus 可能包含多个主键重复的实体。如果出现这种情况,查询时将返回哪个数据副本仍是未知行为。这一限制将在今后的版本中修复。自定义实体主键的最大长度是多少?实体主键必须是非负 64 位整数。每次插入操作可添加的最大数据量是多少?插入操作的大小不得超过 1,024 MB。这是 gRPC 规定的限制。在特定分区中搜索时,Collection 的大小会影响查询性能吗?不会。如果指定了搜索的分区,Milvus 只搜索指定的分区。
到这里,我们已经能在本地跑通最小示例并理解其工作原理。
分析RAG-Anything项目后,我们不难发现,多模态RAG技术正处于关键转折点。
三个关键技术趋势如下:
第一,模态覆盖扩大。RAG-Anything当前支持文本、图像、表格和公式,未来,建立包含视频、音频和3D模型在内的统一多模态RAG框架是大势所趋。
第二,实时处理能力提升。目前多模态RAG主要针对静态文档,但流数据处理需求增长要求支持实时文档更新和增量索引功能。
第三,边缘计算将普及。随着MilvusLite等轻量方案发展,多模态RAG将延伸至移动设备和IoT领域,创造新应用可能。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-09-15
2025-09-02
2025-08-05
2025-08-18
2025-08-25
2025-08-25
2025-08-25
2025-09-03
2025-08-20
2025-09-08
2025-10-04
2025-09-30
2025-09-10
2025-09-10
2025-09-03
2025-08-28
2025-08-25
2025-08-20