微信扫码
添加专属顾问
 
                        我要投稿
深度解析RAGFlow分片引擎,掌握硬核架构与视觉增强技巧。 核心内容: 1. RAGFlow分片引擎的整体流程解析 2. 切片设置与接口参数详细解读 3. 代码实现与功能增强策略详解
 
                                
上次讲解代码以后,把rag这块遗留了下来,rag的代码相对来说比较复杂,一环套一环。我们今天先来拆解下分片的整体流程。
整体分为3个阶段
我们挨个来解析下。
注意:代码为当天更新的代码,部分功能还未发布。
doc_id 文档idparser_config 文档解析配置task_page_size 任务页面大小layout_recognize 使用哪种解析器,默认用的DeepDOCchunk_token_num 块token数delimiter 分段标识符auto_keywords 自动关键词抽取auto_questions自动问题抽取raptor :召回增强RAPTOR策略use_raptor: 是否开启use_raptor 使用召回增强RAPTOR策略graphrag: 知识图谱配置use_graphrag 是否使用知识图谱entity_types 实体类型method 知识图谱方法community实体归一化use_graphrag 社区报告我们按照上一篇解剖RAGFlow!全网最硬核源码架构解析讲的,找到documnet_app.py文件,搜索change_parser方法。
@manager.route('/change_parser', methods=['POST'])  # noqa: F821  
@login_required  
@validate_request("doc_id", "parser_id")  
defchange_parser():  
    req = request.json  
    #权限校验,看下你有没有操作的权限
    ifnot DocumentService.accessible(req["doc_id"], current_user.id):  
    try:  
        # 解析器切换验证,
        e, doc = DocumentService.get_by_id(req["doc_id"])  
        if ((doc.type == FileType.VISUAL and req["parser_id"] != "picture")  
                or (re.search(  
                    r"\.(ppt|pptx|pages)$", doc.name) and req["parser_id"] != "presentation")):  
            return get_data_error_result(message="Not supported yet!")  
        e = DocumentService.update_by_id(doc.id,  
                                         {"parser_id": req["parser_id"], "progress": 0, "progress_msg": "",  
                                          "run": TaskStatus.UNSTART.value})  
        ifnot e:  
            return get_data_error_result(message="Document not found!")  
        if"parser_config"in req:  
            DocumentService.update_parser_config(doc.id, req["parser_config"])  
        if doc.token_num > 0:  
            e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num * -1, doc.chunk_num * -1,  
                                                    doc.process_duation * -1)  
            ifnot e:  
                return get_data_error_result(message="Document not found!")  
            tenant_id = DocumentService.get_tenant_id(req["doc_id"])  
            ifnot tenant_id:  
                return get_data_error_result(message="Tenant not found!")  
            if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):  
                settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)  
        return get_json_result(data=True)  
    except Exception as e:  
        return server_error_response(e)这段代码
未启动这里只是把切片的配置做了设置。整体流程如下:
IndexDBServerClientIndexDBServerClientPOST /change_parser权限检查权限结果解析器类型验证更新解析器配置删除旧索引返回操作结果
api/db/services/task_service.py中rag_flow_svr_queue。这段代码看图片里的注释吧。最后这块的整体流程如下:
PDFExcel其他开始文档类型判断PDF分片处理表格分片处理单任务处理生成任务摘要历史任务检查结果复用处理数据清理任务持久化未完成任务入队结束
rag/svr/task_executor.py中async defhandle_task():
    #通过redis消息队列从`rag_flow_svr_queue`获取任务
    redis_msg, task = await collect()
    ifnot task:  
        # 没有获取到,休眠5秒
        await trio.sleep(5)
        return
    try:
        # 状态记录
        CURRENT_TASKS[task["id"]] = copy.deepcopy(task) 
        # 核心处理
        await do_handle_task(task)
        # 成功处理
        DONE_TASKS += 1
    except Exception as e:
        # 异常处理
        FAILED_TASKS += 1
        set_progress(task_id, -1, str(e))
我们接着看do_handle_task 方法,
# 绑定进度回调
progress_callback = partial(set_progress, task_id, ...) 
# 任务状态检查
if TaskService.do_cancel(task_id):  
    # 主动取消
    progress_callback(-1, "Task canceled")
    return标准模式GraphRAG模式RAPTOR模式raptorgraphragdefault任务类型判断类型层次化摘要知识图谱构建标准分片初始化Chat模型执行run_raptor调整并发限制加载配置执行run_graphrag生成原始分片向量编码
我们看下默认分片中的关键方法
# 分片,这个是核心
chunks = await build_chunks(task, progress_callback)
# 所有的分片向量化,并向量结果写入到每个chunk的["q_%d_vec" % len(v)] = v
token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)简单看下build_chunks方法
async with chunk_limiter:  
    cks = await trio.to_thread.run_sync(lambda: chunker.chunk(task["name"], binary=binary, from_page=task["from_page"],  
                        to_page=task["to_page"], lang=task["language"], callback=progress_callback,  
                        kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"]))最后到了rag/app/naive.py 文件中的chunk方法。在这个方法里根据切片配置进行了处理。整体流程如下:
DOCXPDFExcelTXT/CodeMarkdownHTML/JSON是否输入文件格式判断DOCX解析器PDF解析器+布局识别表格解析器文本分割器MD表格提取结构化解析原始分片生成是否视觉增强?视觉模型处理图表基础分片处理分片合并Token化处理输出结构化分片
在最新的版本中,使用视觉模型,对图表进行增强。该代码还有发布。
整个异步处理如下:
资源清理异常处理任务处理任务收集初始化阶段有消息无消息异常main入口初始化系统启动监控线程任务处理循环获取任务处理任务清理资源打印Banner加载配置设置信号处理初始化内存追踪检查未ACK消息获取任务详情拉取新消息验证任务有效性记录开始状态执行核心处理更新进度记录完成状态捕获错误构造错误信息更新失败状态释放内存引用发送ACK更新计数器
通过最近的源码解析,ragflow后面的升级有几块
Agent添加了版本,最多保留20个版本agent增加团队权限功能53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
 
            2025-09-15
2025-09-02
2025-08-05
2025-08-18
2025-08-25
2025-08-25
2025-08-25
2025-09-03
2025-08-20
2025-09-08
2025-10-04
2025-09-30
2025-09-10
2025-09-10
2025-09-03
2025-08-28
2025-08-25
2025-08-20