免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


【万字长文】Dify 知识库全链路图解:7 个关键节点,彻底拆解 RAG 黑盒

发布日期:2025-08-19 19:57:31 浏览次数: 1764
作者:5ycode

微信搜一搜,关注“5ycode”

推荐语

深入解析Dify知识库的7个关键节点,带你彻底拆解RAG黑盒的实现原理。

核心内容:
1. 知识库创建与文档上传的完整流程
2. 文档解析规则设置与处理机制
3. 后端API调用与索引任务的触发过程

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

 

在上一篇刨开源码看门道:dify 数据集的那些事(一),我们了解了dify的数据集架构,在这篇文章中,我们了解下dify内部知识库的整个流转实现。通过该篇,我们能详细的了解到dify数据是如何解析的。


整个流程如下:

  • • 知识库(数据集)的创建
  • • 知识库的设置
  • • 文档的上传
  • • 文档解析规则的设置
  • • 文档解析

知识库的解析流程

知识库创建


知识库的创建,调用的是console/api/datasets接口

创建完知识库以后,有三个选项
  • • 文档:列出所有文档,或上传文档
  • • 召回测试:检索
  • • 设置:设置知识库的基本信息

文档上传


选择文件,以后,已经通过console/api/files/upload进行了上传。

上传以后,在设置的docker存储目录指定的租户下面。

分片设置


上传完以后,点击下一步进入分片设置页面。在这里
  • • 通过接口console/api/workspaces/current/default-model?model_type=text-embedding 不同的model_type 加载了默认embedding模型和rerank 模型
  • • 通过接口console/api/datasets/process-rule加载到了默认处理规则
  • • 同时也会把所有的embedding模型和rerank 模型加载到用于选择

保存并处理

点击保存并处理的时候,调用的是/console/api/datasets/5acd7076-8fb4-46ba-833d-89b0196ef918/documents接口,

在这里把数据集和文档都传递到了后端。

最终定位到DatasetDocumentListApi的post方法中。具体代码位置如下:
controllers/console/datasets/datasets_document.py  中的DatasetDocumentListApi.post

services/dataset_service.py 中的DocumentService.save_document_with_dataset_id

代码流程如下:

是否上传文件Notion导入网站爬取DatasetDocumentListApi.post验证用户权限解析请求参数为KnowledgeConfig调用DocumentService.save_document_with_dataset_id检查文档上传配额是否更新现有文档?更新文档并触发索引任务创建新文档处理检查批量上传限制保存处理规则处理不同类型数据源检查文件是否存在验证Notion连接准备爬取任务创建文档记录触发异步索引任务返回文档和批次信息
  • • 先对当前用户是否具有资源权限进行判断
  • • 计费体系开启,校验配额
  • • 创建或更新文档参数,并触发索引任务

关于配额

  • • 配额是按租户获取的,一些参数是从配置文件中获取,一些是从配置文件中BILLING_API_URL中配置的系统中获取的,在这里只限制在企业版中实现
  • • 当计费体系开启的时候,会计算上传的额度(数量)已经上传的数量和当前上传的数量做对比

索引方式

  • • 经济索引是每个数据库使用10个关键词进行检索
  • • 高质量索引,使用向量模型+关键词

    在代码里,对高质量索引进行了必要参数的补充。

更新或保持处理规则

这里有近两百行代码来处理不同类型数据源,确保文件和连接的可用。

在这里如果是upload_file,如果开启了重复文件校验,会拿到当前知识库中第一个文件,进行特殊处理。

异步任务推送

当所有的规则处理完成以后,会把任务通过document_indexing_task.delay(dataset.id, document_ids)推到队列中。

异步解析


在tasks目录下,有一个document_indexing_task.py进行异步解析。整体流程梳理如下:
是否是否是否开始获取数据集信息数据集存在?检查资源限制记录日志并退出通过限制检查?更新文档状态为parsing标记文档为错误状态创建IndexingRunner执行索引处理处理成功?记录处理时间捕获异常并记录记录错误原因结束
  • • 首先进行文件是否存在的校验(异步任务,可能中断,也可能用户取消)
  • • 开启计费模式以后,对额度进行再次校验(可能并发的问题)
  • • 更新状态为解析中
  • • 核心逻辑在IndexingRunner.run

解析

indexing_runner.py

这里的流程也很清晰,整个用了模板方法+工厂+策略模式实现
  1. 1. 首先对数据集进行校验,不存在,直接终止
  2. 2. 获取处理规则,没有处理规则也直接终止
  3. 3. 根据索引类型IndexProcessorFactory,获取对应的索引处理器: ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段)
  4. 4. 获取文档内容_extract,解决了不同文档的差异性
  • • 支持三种数据源类型
  • • 统一返回Document对象列表
  • • 自动更新处理进度
  • 5. 转换阶段 _transform
    • • 执行文本清洗(根据处理规则)
    • • 分块处理(自定义或自动分块)
    • • 语言处理(国际化支持)
    • • 向量化准备(高质量索引模式)
  • 6. 分段处理 _load_segments
    • • 使用DatasetDocumentStore管理分段
    • • 支持父子文档结构
    • • 原子化状态更新
  • 7. 索引构建 _load
    • • 高质量模式:使用嵌入模型并行处理
    • • 经济模式:构建关键词倒排索引
    • • 状态一致性管理

    重点看下5、6、7 这三个阶段。

    转换阶段 _transform

    转换阶段的核心对应的是ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段) 这三个处理器的transform 方法。

    重点分析下普通分段和父子分段。

    ParagraphIndexProcessor

    • • 在通用分段中,如果不勾选使用Q&A分段走的是这里

    整体逻辑如下:

    自动自定义有效无效所有文档处理完成开始验证处理规则规则模式加载自动规则加载自定义规则初始化分块器遍历文档文本清洗执行分块分块后处理生成文档ID和哈希去除引导符号有效内容检查添加到结果集返回结果
    • • 在验证处理规则这里,其实是一个兜底策略,正常情况下,如果没有设置,直接使用默认的分段策略
    splitter = self._get_splitter(  
        processing_rule_mode=process_rule.get("mode"),  
        max_tokens=rules.segmentation.max_tokens,  
        chunk_overlap=rules.segmentation.chunk_overlap,  
        separator=rules.segmentation.separator,  
        embedding_model_instance=kwargs.get("embedding_model_instance"),  
    )
    • • 根据规则参数创建分块器
    • • 在_get_splitter内支持两种分块器类型:
      • • 固定分块器(FixedRecursiveCharacterTextSplitter)
      • • 增强型分块器(EnhanceRecursiveCharacterTextSplitter)

    创建好分块器以后,接下来就是文档处理了

    for document in documents:
        # 清洗
        document_text = CleanProcessor.clean(document.page_content, kwargs.get("process_rule", {}))
        document.page_content = document_text
        # 分块
        document_nodes = splitter.split_documents([document])
        # 分块后处理
        for document_node in document_nodes:
            if document_node.page_content.strip():
                # 生成ID和哈希
                doc_id = str(uuid.uuid4())
                hash = helper.generate_text_hash(document_node.page_content) 
                # 去符号处理
                page_content = remove_leading_symbols(document_node.page_content).strip()
                iflen(page_content) > 0:
                    document_node.page_content = page_content
                    all_documents.append(document_node)
    • • 遍历所有的文档,处理步骤如下:
      • • 文本清洗(根据页面上的文本预处理规则进行清洗)
      • • 按规则分块
      • • 为每个分块生成唯一标识
      • • 内容标准化处理,把一些标题进行移除
    # TextSplitter.split_documents内部实现的典型处理:
    defsplit_documents(self, documents: Iterable[Document]) -> list[Document]:  
        """Split documents."""
        texts, metadatas = [], []  
        for doc in documents:  
            texts.append(doc.page_content)  
            metadatas.append(doc.metadata or {})  
        returnself.create_documents(texts, metadatas=metadatas)

    defcreate_documents(self, texts: list[str], metadatas: Optional[list[dict]] = None) -> list[Document]:  
        """Create documents from a list of texts."""
        _metadatas = metadatas or [{}] * len(texts)  
        documents = []  
        for i, text inenumerate(texts):  
            index = -1
            for chunk inself.split_text(text):  
                metadata = copy.deepcopy(_metadatas[i])  
                ifself._add_start_index:  
                    index = text.find(chunk, index + 1)  
                    metadata["start_index"] = index  
                new_doc = Document(page_content=chunk, metadata=metadata)  
                documents.append(new_doc)  
        return documents


    # FixedRecursiveCharacterTextSplitter.split_text

    defsplit_text(self, text: str) -> list[str]:  
        """Split incoming text and return chunks."""
        ifself._fixed_separator:  
            chunks = text.split(self._fixed_separator)  
        else:  
            chunks = [text]  

        final_chunks = []  
        chunks_lengths = self._length_function(chunks)  
        for chunk, chunk_length inzip(chunks, chunks_lengths):  
            if chunk_length > self._chunk_size:  
                final_chunks.extend(self.recursive_split_text(chunk))  
            else:  
                final_chunks.append(chunk)  

        return final_chunks

    这里的逻辑稍微有点绕,通过继承+模板抽象方法。通过子类实现对应的文本切分。主要在_get_splitter的时候创建的分块器

    ParentChildIndexProcessor


    在父子分段中,支持两种父文档模式:段落模式(PARAGRAPH)和全文模式(FULL_DOC),主要处理流程分支:
    • • PARAGRAPH模式:先分块父文档再生成子文档
    • • FULL_DOC模式:直接处理全文生成子文档

    整体流程如下:

    PARAGRAPHFULL_DOC所有文档处理完成开始验证处理规则父文档模式遍历原始文档合并全文内容文本清洗父文档分块生成子文档构建层次化文档添加到结果集生成子文档构建层次化文档添加到结果集返回结果

    相比于通用分段,多了一层,整体逻辑上差不多,只不过父子分段先用父分段规则,分出来的父分段,又用了子分段规则。

    分段处理 _load_segments

    继续回到index_runner.py中的_load_segments方法中,这块的逻辑比较简单。 将处理后的文档分段通过dataset_docstore.py的 add_documents保存到数据库,并且更新文档和分段的状态为indexing

    数据库的操作细节如下:存储到pg里的

    # DocumentStore.add_documents内部逻辑:
    defadd_documents(self, docs, save_child=False):
        segments = []
        for doc in docs:
            # 创建父分段
            segment = DocumentSegment(
                dataset_id=self.dataset.id,
                document_id=self.document_id,
                content=doc.page_content,
                index_node_id=doc.metadata["doc_id"],
                index_node_hash=doc.metadata["doc_hash"]
            )
            segments.append(segment)
            
            if save_child andhasattr(doc, 'children'):
                for child in doc.children:
                    # 创建子文档记录
                    child_chunk = ChildChunk(
                        segment_id=segment.id,
                        content=child.page_content,
                        index_node_id=child.metadata["doc_id"]
                    )
                    db.session.add(child_chunk)
        
        db.session.bulk_save_objects(segments)

    如果文档多了,这块是一个性能瓶颈。

    索引构建 _load

    分段完成,并保存完成以后,就开始做向量化了。在这里

    • • 根据索引技术类型(高质量/经济型)选择不同处理方式
    • • 更新文档和分段的状态为"completed"

    整体逻辑如下:

    高质量经济型开始索引技术类型初始化嵌入模型启动关键词索引线程初始化线程池文档分块分组提交并行任务等待任务完成等待线程完成统计总tokens更新文档状态结束

    高质量的时候,必须使用向量模型。高质量处理的关键逻辑

    with ThreadPoolExecutor(max_workers=10as executor:
        futures = []
        # 文档分块逻辑(避免哈希冲突)
        for chunk in document_groups:
            futures.append(executor.submit(
                self._process_chunk,
                flask_app,
                index_processor,
                chunk,
                dataset,
                dataset_document,
                embedding_model_instance
            ))
        tokens = sum(future.result() for future in futures)

    # _process_chunk 的核心逻辑如下:
    # 先检测任务是否暂停
    self._check_document_paused_status(dataset_document.id)
    # Token计算
    tokens = embedding_model_instance.get_text_embedding_num_tokens(texts)

    # 索引构建
    index_processor.load(dataset, chunk_documents, with_keywords=False)

    # 状态更新
    db.session.query(DocumentSegment).filter(
        DocumentSegment.index_node_id.in_(document_ids)
    ).update({
        "status""completed",
        "completed_at": datetime.now()
    })

    将分段chunk,通过线程池,批量的向量化。在索引构建的时候,又跑到了index_processor这里,也就是对应的ParagraphIndexProcessor(通用)QAIndexProcessor(问答)ParentChildIndexProcessor(父子分段)

    看到这里,我就有个疑问,在父子分段中,高质量索引的时候,关键词处理直接为false,在paragraph_index_processor.py中,只是进行了向量化存储,没有提取关键词。在parent_child_index_processor.py只是对子分段进行了向量化并存储,这个可以理解

    向量完以后,更新为完成。

    后记

    • • 通过深入dify的解析过程,了解了整体的分段过程
    • • 检索效果的提升,还需要对检索这块进行代码分析,方便后续优化dify的rag效果

 

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询