微信扫码
添加专属顾问
我要投稿
深入解析Dify知识库的7个关键节点,带你彻底拆解RAG黑盒的实现原理。 核心内容: 1. 知识库创建与文档上传的完整流程 2. 文档解析规则设置与处理机制 3. 后端API调用与索引任务的触发过程
在上一篇刨开源码看门道:dify 数据集的那些事(一),我们了解了dify的数据集架构,在这篇文章中,我们了解下dify内部知识库的整个流转实现。通过该篇,我们能详细的了解到dify数据是如何解析的。
整个流程如下:
console/api/datasets
接口console/api/files/upload
进行了上传。console/api/workspaces/current/default-model?model_type=text-embedding
不同的model_type 加载了默认embedding模型和rerank 模型console/api/datasets/process-rule
加载到了默认处理规则/console/api/datasets/5acd7076-8fb4-46ba-833d-89b0196ef918/documents
接口,DatasetDocumentListApi
的post方法中。具体代码位置如下:controllers/console/datasets/datasets_document.py 中的DatasetDocumentListApi.post
services/dataset_service.py 中的DocumentService.save_document_with_dataset_id
代码流程如下:
是否上传文件Notion导入网站爬取DatasetDocumentListApi.post验证用户权限解析请求参数为KnowledgeConfig调用DocumentService.save_document_with_dataset_id检查文档上传配额是否更新现有文档?更新文档并触发索引任务创建新文档处理检查批量上传限制保存处理规则处理不同类型数据源检查文件是否存在验证Notion连接准备爬取任务创建文档记录触发异步索引任务返回文档和批次信息
BILLING_API_URL
中配置的系统中获取的,在这里只限制在企业版中实现这里有近两百行代码来处理不同类型数据源,确保文件和连接的可用。
在这里如果是
upload_file
,如果开启了重复文件校验,会拿到当前知识库中第一个文件,进行特殊处理。
当所有的规则处理完成以后,会把任务通过document_indexing_task.delay(dataset.id, document_ids)
推到队列中。
document_indexing_task.py
进行异步解析。整体流程梳理如下:是否是否是否开始获取数据集信息数据集存在?检查资源限制记录日志并退出通过限制检查?更新文档状态为parsing标记文档为错误状态创建IndexingRunner执行索引处理处理成功?记录处理时间捕获异常并记录记录错误原因结束
IndexingRunner.run
中IndexProcessorFactory
,获取对应的索引处理器: ParagraphIndexProcessor(通用)
、QAIndexProcessor(问答)
、ParentChildIndexProcessor(父子分段)
_extract
,解决了不同文档的差异性_transform
_load_segments
_load
重点看下5、6、7 这三个阶段。
_transform
转换阶段的核心对应的是ParagraphIndexProcessor(通用)
、QAIndexProcessor(问答)
、ParentChildIndexProcessor(父子分段)
这三个处理器的transform
方法。
重点分析下普通分段和父子分段。
Q&A分段
走的是这里整体逻辑如下:
自动自定义有效无效所有文档处理完成开始验证处理规则规则模式加载自动规则加载自定义规则初始化分块器遍历文档文本清洗执行分块分块后处理生成文档ID和哈希去除引导符号有效内容检查添加到结果集返回结果
splitter = self._get_splitter(
processing_rule_mode=process_rule.get("mode"),
max_tokens=rules.segmentation.max_tokens,
chunk_overlap=rules.segmentation.chunk_overlap,
separator=rules.segmentation.separator,
embedding_model_instance=kwargs.get("embedding_model_instance"),
)
_get_splitter
内支持两种分块器类型:创建好分块器以后,接下来就是文档处理了
for document in documents:
# 清洗
document_text = CleanProcessor.clean(document.page_content, kwargs.get("process_rule", {}))
document.page_content = document_text
# 分块
document_nodes = splitter.split_documents([document])
# 分块后处理
for document_node in document_nodes:
if document_node.page_content.strip():
# 生成ID和哈希
doc_id = str(uuid.uuid4())
hash = helper.generate_text_hash(document_node.page_content)
# 去符号处理
page_content = remove_leading_symbols(document_node.page_content).strip()
iflen(page_content) > 0:
document_node.page_content = page_content
all_documents.append(document_node)
# TextSplitter.split_documents内部实现的典型处理:
defsplit_documents(self, documents: Iterable[Document]) -> list[Document]:
"""Split documents."""
texts, metadatas = [], []
for doc in documents:
texts.append(doc.page_content)
metadatas.append(doc.metadata or {})
returnself.create_documents(texts, metadatas=metadatas)
defcreate_documents(self, texts: list[str], metadatas: Optional[list[dict]] = None) -> list[Document]:
"""Create documents from a list of texts."""
_metadatas = metadatas or [{}] * len(texts)
documents = []
for i, text inenumerate(texts):
index = -1
for chunk inself.split_text(text):
metadata = copy.deepcopy(_metadatas[i])
ifself._add_start_index:
index = text.find(chunk, index + 1)
metadata["start_index"] = index
new_doc = Document(page_content=chunk, metadata=metadata)
documents.append(new_doc)
return documents
# FixedRecursiveCharacterTextSplitter.split_text
defsplit_text(self, text: str) -> list[str]:
"""Split incoming text and return chunks."""
ifself._fixed_separator:
chunks = text.split(self._fixed_separator)
else:
chunks = [text]
final_chunks = []
chunks_lengths = self._length_function(chunks)
for chunk, chunk_length inzip(chunks, chunks_lengths):
if chunk_length > self._chunk_size:
final_chunks.extend(self.recursive_split_text(chunk))
else:
final_chunks.append(chunk)
return final_chunks
这里的逻辑稍微有点绕,通过继承+模板抽象方法。通过子类实现对应的文本切分。主要在_get_splitter
的时候创建的分块器
整体流程如下:
PARAGRAPHFULL_DOC所有文档处理完成开始验证处理规则父文档模式遍历原始文档合并全文内容文本清洗父文档分块生成子文档构建层次化文档添加到结果集生成子文档构建层次化文档添加到结果集返回结果
相比于通用分段,多了一层,整体逻辑上差不多,只不过父子分段先用父分段规则,分出来的父分段,又用了子分段规则。
_load_segments
继续回到index_runner.py
中的_load_segments
方法中,这块的逻辑比较简单。 将处理后的文档分段通过dataset_docstore.py的 add_documents
保存到数据库,并且更新文档和分段的状态为indexing
。
数据库的操作细节如下:存储到pg里的
# DocumentStore.add_documents内部逻辑:
defadd_documents(self, docs, save_child=False):
segments = []
for doc in docs:
# 创建父分段
segment = DocumentSegment(
dataset_id=self.dataset.id,
document_id=self.document_id,
content=doc.page_content,
index_node_id=doc.metadata["doc_id"],
index_node_hash=doc.metadata["doc_hash"]
)
segments.append(segment)
if save_child andhasattr(doc, 'children'):
for child in doc.children:
# 创建子文档记录
child_chunk = ChildChunk(
segment_id=segment.id,
content=child.page_content,
index_node_id=child.metadata["doc_id"]
)
db.session.add(child_chunk)
db.session.bulk_save_objects(segments)
如果文档多了,这块是一个性能瓶颈。
_load
分段完成,并保存完成以后,就开始做向量化了。在这里
整体逻辑如下:
高质量经济型开始索引技术类型初始化嵌入模型启动关键词索引线程初始化线程池文档分块分组提交并行任务等待任务完成等待线程完成统计总tokens更新文档状态结束
高质量的时候,必须使用向量模型。高质量处理的关键逻辑
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
# 文档分块逻辑(避免哈希冲突)
for chunk in document_groups:
futures.append(executor.submit(
self._process_chunk,
flask_app,
index_processor,
chunk,
dataset,
dataset_document,
embedding_model_instance
))
tokens = sum(future.result() for future in futures)
# _process_chunk 的核心逻辑如下:
# 先检测任务是否暂停
self._check_document_paused_status(dataset_document.id)
# Token计算
tokens = embedding_model_instance.get_text_embedding_num_tokens(texts)
# 索引构建
index_processor.load(dataset, chunk_documents, with_keywords=False)
# 状态更新
db.session.query(DocumentSegment).filter(
DocumentSegment.index_node_id.in_(document_ids)
).update({
"status": "completed",
"completed_at": datetime.now()
})
将分段chunk,通过线程池,批量的向量化。在索引构建的时候,又跑到了index_processor这里,也就是对应的ParagraphIndexProcessor(通用)
、QAIndexProcessor(问答)
、ParentChildIndexProcessor(父子分段)
看到这里,我就有个疑问,在父子分段中,高质量索引的时候,关键词处理直接为false,在paragraph_index_processor.py中,只是进行了向量化存储,没有提取关键词。在parent_child_index_processor.py只是对子分段进行了向量化并存储,这个可以理解
向量完以后,更新为完成。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-29
Dify 1.8.0 实测:多模型管理、MCP OAuth、异步存储,真升级还是鸡肋?
2025-08-28
Dify v1.8.0大版本更新:多模型凭证系统的底层架构革新与MCP的 OAuth 集成能力突破!
2025-08-27
Dify发布了V1.8.0版本,安全性和性能有了重大改进,让我们一起来看看吧!
2025-08-25
4300字长文:使用dify搭建合同审核Agent
2025-08-23
Dify集成MCP服务
2025-08-23
Dify v1.7.2 实战爆破:6 大特性颠覆开发,23 处修复稳如老狗
2025-08-20
深度实战:我用 Dify 复刻了 1688 的 AI 搜索,“多路召回”才是灵魂
2025-08-20
Dify Java Client
2025-06-04
2025-06-25
2025-06-03
2025-06-02
2025-06-05
2025-06-30
2025-06-29
2025-06-10
2025-06-24
2025-06-09
2025-08-29
2025-08-18
2025-08-02
2025-07-30
2025-06-26
2025-06-17
2025-05-29
2025-05-28