Ragflow 文档处理深度解析:从解析到存储的完整流程
1. 总体架构概览
Ragflow 的文档处理流程可以分为以下几个核心阶段:
文档上传 → 解析文档构建为Chunk → 数据增强阶段 → Embedding计算 → ES存储
- 文档上传:用户通过前端界面上传原始文档,并配置解析参数,如:切片方法、自动关键词提取、自动问题提取数量等。
- 解析文档构建为Chunk:后端根据设定的切片方法选择合适的解析器,将文档内容切分为结构化的知识片段(Chunk)
- 数据增强阶段:对切分后的 Chunk 进行数据增强,比如对每个 Chunk 通过AI提取一些关键词、检索问题等
- Embedding计算:对每个 Chunk 计算向量表示(Embedding)。
- ES存储:将完整的 Chunk 信息写入 Elasticsearch,在后续知识检索时被使用。信息包括每个 Chunk 的文本、AI提取的关键词、AI提取的问题、Embedding等
源码主要对应于:
- Task Executor (
rag/svr/task_executor.py
):解析任务执行 - Parser Factory (
rag/app/
):各种文档解析器
2. 解析文档构建Chunk阶段 (build_chunks
)
这个阶段负责将原始文档转换为结构化的知识片段。
不同类型的文档(如PDF、Word、Excel等)需要采用不同的解析策略来保证内容提取的准确性。
Ragflow 前端界面提供了多种切片方法选项,每种方法都对应特定的Python解析器类。下面详细说明前端切片方法与后端源码的对应关系:
前端切片方法与源码映射表
rag/app/naive.py
naive.chunk()
rag/app/resume.py
resume.chunk()
rag/app/manual.py
manual.chunk()
rag/app/table.py
table.chunk()
rag/app/paper.py
paper.chunk()
rag/app/book.py
book.chunk()
rag/app/laws.py
laws.chunk()
rag/app/presentation.py
presentation.chunk()
rag/app/one.py
one.chunk()
rag/app/picture.py
picture.chunk()
rag/app/audio.py
audio.chunk()
rag/app/email.py
email.chunk()
rag/app/qa.py
qa.chunk()
rag/app/tag.py
tag.chunk()
rag/app/naive.py
naive.chunk()
工厂模式实现
在 rag/svr/task_executor.py
中,系统通过工厂模式将前端选择映射到具体的解析器:
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, email, tagFACTORY = { \"general\": naive, ParserType.NAIVE.value: naive, // 通用文档解析器,适用于PDF、Word、HTML等常规格式 ParserType.PAPER.value: paper, // 学术论文解析器,能识别摘要、作者、章节等学术结构 ParserType.BOOK.value: book, // 书籍解析器,支持目录识别和章节分割 ParserType.PRESENTATION.value: presentation, // 演示文稿(PPT)解析器,按页面分割并保留图像信息 ParserType.MANUAL.value: manual, // 手册类文档解析器,支持层次化章节结构 ParserType.LAWS.value: laws, // 法律法规解析器,按条款和章节结构化分割 ParserType.QA.value: qa, // 问答类文档解析器 ParserType.TABLE.value: table, // 表格数据解析器,保持表格结构和关系 ParserType.RESUME.value: resume, // 简历解析器,能识别姓名、经历、技能等结构化信息 ParserType.PICTURE.value: picture, // 图片文档解析器 ParserType.ONE.value: one, // 整文档模式,将整个文档作为单一chunk ParserType.AUDIO.value: audio, // 音频文档解析器 ParserType.EMAIL.value: email, // 邮件文档解析器 ParserType.KG.value: naive, // 知识图谱模式,复用naive解析器 ParserType.TAG.value: tag // 标签文档解析器}
当用户在前端选择某种切片方法时,系统会根据选择调用对应的解析器:
# 根据用户选择获取解析器chunker = FACTORY[task[\"parser_id\"].lower()] // task[\"parser_id\"].lower() 示例:table# 调用对应解析器的chunk方法chunks = chunker.chunk(filename, binary=binary, **params)
Chunk数据格式详解
每个解析器的 chunk()
方法都会返回一个chunk列表,每个chunk是一个包含标准化字段的字典对象。不同类型的解析器会生成一些特定的字段,但所有chunk都包含以下核心字段:
基础字段(所有chunk共有):
docnm_kwd
\"技术文档.pdf\"
title_tks
[\"技术\", \"文档\"]
title_sm_tks
[\"技\", \"术\", \"文\", \"档\"]
content_with_weight
\"这是文档的主要内容...\"
content_ltks
[\"这是\", \"文档\", \"的\", \"主要\", \"内容\"]
content_sm_ltks
[\"这\", \"是\", \"文\", \"档\", \"的\", \"主\", \"要\", \"内\", \"容\"]
位置字段(PDF等结构化文档):
page_num_int
[1, 2]
position_int
[(1, 100, 500, 200, 300)]
top_int
[200]
图像字段(包含图片的chunk):
image
doc_type_kwd
\"image\"
其它解析器不再一一展开介绍,需要用到的时候可以读对应源码。
3. 数据增强阶段(通过设定的AI Promt提示词生成新的数据)
如果切片方法选择 General、Book、One 等,可以在前端界面上指定 自动关键词提取、自动问题提取 数量。如果指定数量不为 0,那么在基础的文档解析完成后,Ragflow 还会调用AI服务来增强chunk的语义信息。
3.1 自动关键词提取
当配置 自动关键词提取(auto_keywords) 数量大于 0 时,系统会为每个chunk自动生成指定数量的关键词。
关键词提取的prompt (rag/prompts/keyword_prompt.md
) 如下:
## RoleYou are a text analyzer.## TaskExtract the most important keywords/phrases of a given piece of text content.## Requirements- Summarize the text content, and give the top {{ topn }} important keywords/phrases.- The keywords MUST be in the same language as the given piece of text content.- The keywords are delimited by ENGLISH COMMA.- Output keywords ONLY.---## Text Content{{ content }}
其中 content 是 Chunk 的文本,topn 是用户配置的关键词数量。
3.2 自动问题提取
当配置 自动问题提取(auto_questions) 数量大于 0 时,系统会为每个chunk自动生成指定数量的问题。
问题提取的prompt (rag/prompts/question_prompt.md
) 如下:
## RoleYou are a text analyzer.## TaskPropose {{ topn }} questions about a given piece of text content.## Requirements- Understand and summarize the text content, and propose the top {{ topn }} important questions.- The questions SHOULD NOT have overlapping meanings.- The questions SHOULD cover the main content of the text as much as possible.- The questions MUST be in the same language as the given piece of text content.- One question per line.- Output questions ONLY.---## Text Content{{ content }}
其中 content 也是 Chunk 的文本,topn 是用户配置的问题数量。
4. Embedding计算阶段
Embedding计算是将文本转换为向量表示的关键步骤,这些向量将用于后续的语义检索。Ragflow 采用了标题和内容加权融合的embedding策略,具体实现位于 rag/svr/task_executor.py
的 embedding
函数中。
Ragflow 计算 Chunk 的 embedding 时,计算策略的伪代码如下:
content = AI提取的问题 if exists(AI提取的问题) else 原始Chunk文本final_embeding = encode_as_embeding(Chunk所在文档的标题文本) * 0.1 + encode_as_embeding(content) * 0.9
说明:每个 Chunk 的 embedding 向量为 文档标题文本的 embedding 向量乘以 0.1,和 Chunk 的内容文本的 embedding 向量乘以 0.9 的和。其中 Chunk 的内容文本优先设置为对该Chunk的AI提取出的问题,如果没有指定自动问题提取数量,则问题为空,此时才会使用原始的 Chunk 文本。
具体源码的注释如下:
async def embedding(docs, mdl, parser_config=None, callback=None): \"\"\" 为文档chunks计算embedding向量 Args: docs: chunk列表,每个chunk是一个字典 mdl: embedding模型对象 parser_config: 解析器配置,包含embedding权重等参数 callback: 进度回调函数 Returns: tk_count: 总token消耗数量 vector_size: embedding向量的维度 \"\"\" if parser_config is None: parser_config = {} # 第一步:准备每个Chunk的标题文本和内容文本 tts, cnts = [], [] # tts=titles标题列表, cnts=contents内容列表 for d in docs: # 标题文本:使用文档名称,所有chunk共享同一个文档标题 tts.append(d.get(\"docnm_kwd\", \"Title\")) # 内容文本:优先使用AI生成的问题,如果没有则使用原始chunk内容 c = \"\\n\".join(d.get(\"question_kwd\", [])) # 将AI生成的问题列表合并为一个字符串 if not c: # 如果没有AI问题,则使用原始内容 c = d[\"content_with_weight\"] c = re.sub(r\"</?(table|td|caption|tr|th)( [^]{0,12})?>\", \" \", c) if not c: c = \"None\" cnts.append(c) # 第二步:计算标题embedding向量(只计算一次,然后复制给相同文档的所有chunk) tk_count = 0 if len(tts) == len(cnts): # 只对第一个标题进行embedding编码 vts, c = await trio.to_thread.run_sync(lambda: mdl.encode(tts[0: 1])) # 将标题向量复制给所有chunk(因为同一文档的所有chunk共享标题) tts = np.concatenate([vts for _ in range(len(tts))], axis=0) tk_count += c # 第三步:分批计算内容embedding向量(避免内存溢出) cnts_ = np.array([]) # 存储所有内容向量 for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE): async with embed_limiter: # 对当前批次的内容进行embedding vts, c = await trio.to_thread.run_sync( lambda: mdl.encode([ truncate(c, mdl.max_length-10)for c in cnts[i: i + EMBEDDING_BATCH_SIZE] ]) ) # 拼接当前批次的向量到总向量数组 if len(cnts_) == 0: cnts_ = vts else: cnts_ = np.concatenate((cnts_, vts), axis=0) tk_count += c callback(prog=0.7 + 0.2 * (i + 1) / len(cnts), msg=\"\") cnts = cnts_ # 将批量计算出的 content embeding 结果赋值给cnts # 第四步:加权融合标题向量和内容向量 # 获取标题权重配置,默认为0.1(即10%) filename_embd_weight = parser_config.get(\"filename_embd_weight\", 0.1) if not filename_embd_weight: # 处理None值的情况 filename_embd_weight = 0.1 title_w = float(filename_embd_weight) # 标题权重 logging.info(\"标题权重 title_w: %s, 内容权重 1-title_w: %s\", title_w, 1-title_w) # 加权融合公式:final_vector = title_weight * title_vector + content_weight * content_vector # 如果标题和内容向量数量一致,则进行融合;否则只使用内容向量 vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts # 第五步:将最终embedding向量存储到每个chunk中 assert len(vects) == len(docs) # 确保向量数量与chunk数量一致 vector_size = 0 for i, d in enumerate(docs): v = vects[i].tolist() # 将numpy数组转换为Python列表,便于JSON序列化 vector_size = len(v) # 记录向量维度 # 存储向量到chunk中,字段名格式为 q_{维度}_vec(如q_768_vec、q_1024_vec) d[\"q_%d_vec\" % len(v)] = v return tk_count, vector_size # 返回token消耗和向量维度
5. ES存储阶段
最终处理后的每个Chunk数据会被存入 Doc Store 中,默认是 ES (rag\\utils\\es_conn.py)。
最终 ES Chunk 结构示例
{ \"id\": \"abc123...\", \"doc_id\": \"doc_001\", \"kb_id\": \"kb_001\", \"docnm_kwd\": \"技术文档.pdf\", \"title_tks\": \"技术 文档\", \"title_sm_tks\": \"技 术 文 档\", \"content_with_weight\": \"这是一段技术内容...\", \"content_ltks\": \"这是 一段 技术 内容\", \"content_sm_ltks\": \"这 是 一 段 技 术 内 容\", \"important_kwd\": [\"技术\", \"内容\", \"文档\"], \"important_tks\": \"技术 内容 文档\", \"question_kwd\": [\"什么是技术?\", \"如何使用?\"], \"question_tks\": \"什么 是 技术 如何 使用\", \"q_768_vec\": [0.1, 0.2, ..., 0.8], // 768维embedding向量 \"page_num_int\": [1], \"position_int\": [(1, 100, 500, 200, 300)], \"create_time\": \"2024-01-01 12:00:00\", \"create_timestamp_flt\": 1704067200.0}