2026年7月9日 周四晚上19:30,报名腾讯会议了解“如何构建自进化的动态知识库(Brain)”(限30人)
免费POC, 零成本试错
FDE知识库

FDE知识库

学习大模型的前沿技术与行业落地应用


收藏

从10万份文档中更快、更准确地找到信息,还能理解语义!试试ElasticSearch+RAG

发布日期:2024-06-28 08:05:22 浏览次数: 2858
作者:活水智能

微信搜一搜,关注“活水智能”


目前,高效搜索和分析大量文档仍然是一项非常耗时的任务。

法律文件更是如此,因为精确性和全面性至关重要。

本文将探讨如何使用 ElasticSearch 和大模型相关技术检索增强生成(RAG)处理和检索超过10万份德语法律文档的信息。

(编者注:Elasticsearch 是一个开源的全文搜索引擎,其每个字段均可被索引,可以在极短的时间内存储、搜索和分析TB级数据)

处理海量文本的挑战

法律文件非常复杂,包含复杂的细节和特定术语。主要难点在于创建一个高效的系统,能够处理庞大的法律文档库,并快速提供准确、相关的结果。

这个问题以前也有解决方法。然而问题主要出在分块策略上。

之前使用的是langchain的递归分块策略。这是一种不错的策略,但在巨量文档面前,它无法保持语义完整性。

让我们来看一个使用句子转换器进行文本分割的示例代码。

from langchain.text_splitter import SentenceTransformersTokenTextSplitter
text_splitter = SentenceTransformersTokenTextSplitter(
                                      tokens_per_chunk = 480,
                                      chunk_overlap = 50,
                                      model_name = "intfloat/multilingual-e5-large-instruct"
                                      )
texts = text_splitter.split_documents(data)

虽然分块重叠是一种有效的保持上下文的方法,但它也存在一些缺点和潜在的问题需要考虑:

需要更多储存空间。重叠的分块意味着某些文本部分会在多个分块中重复。这种冗余增加了对储存空间的需求,特别是在处理非常大的文档库时。

增加处理时间。更多的数据需要处理会导致计算开销增加。每个分块都需要处理和索引,这可能会减慢整个系统的速度。

上下文碎片化。虽然重叠有助于保持上下文,但对于上下文逻辑严密或复杂的文本来说,这可能还不够。重要信息可能仍然会在分块之间碎片化,导致理解或检索不完整。

由于我们有超过10万份文档,每份文档多达1000多页,这种策略并不适用。

解决方案

由于数据存储在云端的不同文件夹中,每个文件夹中有大量文件。

一个一个下载将耗费大量时间。因此,我选择了并发下载(concurrent futures),可以同时下载多个文件。

from concurrent import futures
from concurrent.futures import ProcessPoolExecutor

def download_parallel_multiprocessing():
   with ProcessPoolExecutor() as executor:
       future_to_key = {executor.submit(download_one_file, key): key for key in list_key_to_download}

       for future in futures.as_completed(future_to_key):
           key = future_to_key[future]
           exception = future.exception()

           if not exception:
               yield key, future.result()
           else:
               yield key, exception

for key, result in download_parallel_multiprocessing():
#     print(f"{key}: {result}")
   pass

这些文件下载后存储在临时文件夹中以供使用。接下来,我们定义了用于嵌入的模型。

由于这里使用的是德语,需要选择一个多语言编码器模型。因此,我们选择了intfloat/multilingual-e5-large。

from langchain_community.llms import WatsonxLLM
# from dotenv import load_dotenv
import os
from ibm_watson_machine_learning.metanames import GenTextParamsMetaNames as GenParams

# Model Declaration

api_key = "Your Key Here"
ibm_cloud_url = "Write your URL"
project_id = "The project id"

# Params Declaration
params = {
   GenParams.DECODING_METHOD: "sample",
   GenParams.MIN_NEW_TOKENS: 50,
   GenParams.MAX_NEW_TOKENS: 430,
   GenParams.RANDOM_SEED: 42,
   GenParams.TEMPERATURE: 0,
   GenParams.TOP_K: 20,
   GenParams.TOP_P:1
}
# Model Configuration
llm = WatsonxLLM(
           model_id='intfloat/multilingual-e5-large',
           url=ibm_cloud_url,
           apikey=api_key,
           project_id=project_id,
           params=params,
           )

过滤异常文件和去重

以下代码过滤了用户不需要的异常文件。它会在文件名中查找特定模式,然后在移除该模式后存储调整后的文件名。如果文件存在,则放入,表明存在重复文件。

import re

def matching_files_with_names(file_names):
   matching_files = []
   for file_name in file_names:
       # Check if the file name contains a pattern
       pattern_match = re.search(r'\[\d+\]', file_name)
       if pattern_match:
           # Remove the pattern and check if the resulting name exists in the list
           pattern_removed_name = re.sub(r'\[\d+\].', '', file_name)
           if pattern_removed_name in file_names and pattern_removed_name != file_name:
               matching_files.append([file_name, pattern_removed_name])
   return matching_files

matching_files = matching_files_with_names(matching_pairs)

在此之后,剩下超过102,000个文件需要处理。

提取元数据

下一步是预处理文件并将其分解成较小的块,以便后续处理。

由于文件内容是xml格式,实际的文本内容是html格式。

我们可以根据标题将文档分块。这样每个标题和文本都保持完整,从而保持块的语义含义。

第一步是加载和读取xml文件,并提取所有元数据。我们使用for循环遍历所有文件。

import lxml
import pickle
from lxml import etree
from langchain.docstore.document import Document

dir_list_pre = os.listdir("temp_pre/")
# destination_path = "temp_post/"
full_doc = []

for file in tqdm(list(matching_pairs_og.keys())):
   file_path = "temp_pre/" + file
   with open(file_path, "r") as f:
       xml_content = f.read()

   # Parse the XML
   try:
       with open(file_path, "rb") as f:
           xml_content_tag_reader = f.read()
       parser = etree.XMLParser(recover=True)
       root = etree.fromstring(xml_content_tag_reader, parser)
       # Get the ElementTree object
       tree = root.getroottree()
       #Extracting metadata except that of text
       meta_tags = {tree.getpath(d): d.text for d in root.iterdescendants() if '/txt' not in tree.getpath(d)}

       soup = BeautifulSoup(xml_content, "xml")

       text_tags = soup.find_all('txt')
       html_content = ''.join([' ' + tag.get_text() for tag in text_tags])

       if len(html_content)>0:

           html_soup = BeautifulSoup(html_content, "html.parser")

           # Find all <p> tags containing <strong> tags
           paragraphs_with_strong = html_soup.find_all(lambda tag: tag.name == 'p' and tag.strong)

           # Iterate through each <p> tag containing <strong> tags
           for p_tag in paragraphs_with_strong:
               strong_tag = p_tag.strong
               if strong_tag:
                   strong_text = strong_tag.text.strip()
                   # Check if the text starts with a digit followed by a period and a space
                   if re.match(r'^\d+\. ', strong_text):
                       # Find the previous sibling header tag
                       previous_header = p_tag.find_previous_sibling(lambda tag: tag.name.startswith('h') and len(tag.name) == 2)
                       if previous_header:
                           # Determine the level of the next header
                           next_header_level = 'h6'
                           # Replace the <p> tag with the next level header
                           new_header_tag = html_soup.new_tag(next_header_level)
                           new_header_tag.string = strong_text  # Retain the text content within <strong>
                           p_tag.replace_with(new_header_tag)

           # Get the modified HTML
           modified_html = str(html_soup)

           headers_to_split_on = [
               ("h1", "Header 1"),
               ("h2", "Header 2"),
               ("h3", "Header 3"),
               ("h4", "Header 4"),
               ("h5", "Header 5"),
               ("h6", "Header 6")
           ]

           html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
           html_header_splits = html_splitter.split_text(modified_html)
           for i, split in enumerate(html_header_splits):
               tokens = llm.get_num_tokens(split.page_content)
#                 html_header_splits[i].metadata.update(meta_tags)
               html_header_splits[i].metadata["token_count"] = tokens
               html_header_splits[i].metadata["source"] = file
               html_header_splits[i].metadata["index"] = i

           full_doc.append(html_header_splits)

       else:
           
           # Getting text from the file
           soup = BeautifulSoup(xml_content, "xml")
           text_tags = soup.find_all('txtascii')
           html_content = ''.join([' ' + tag.get_text() for tag in text_tags])
               
           lc_doc = Document(page_content=html_content)
           tokens = llm.get_num_tokens(html_content)
#             lc_doc.metadata["tags"] = meta_tags
           lc_doc.metadata["token_count"] = tokens
           lc_doc.metadata["source"] = file
           lc_doc.metadata["index"] = 1
           full_doc.append(lc_doc)
           
   except lxml.etree.XSLTApplyError as e:
       continue
   
# Save the list to a file
with open('full_xml.pkl', 'wb') as file:
   pickle.dump(full_doc, file)
   
#Upload the file to COS
cos_client.upload_file('./full_xml.pkl', bucket_name, 'full_xml.pkl')
# cos_client.Object(bucket_name, item_name).put(Body=filelike_object)

一旦完成第一步后,使用lxml中的etree调用解析器函数,提取所有元数据。

with open(file_path, "rb") as f:
           xml_content_tag_reader = f.read()
       parser = etree.XMLParser(recover=True)
       root = etree.fromstring(xml_content_tag_reader, parser)
       # Get the ElementTree object
       tree = root.getroottree()
       #Extracting metadata except that of text
       meta_tags = {tree.getpath(d): d.text for d in root.iterdescendants() if '/txt' not in tree.getpath(d)}

提取txt标签

下一步是读取内容并从xml内容中提取所有txt标签。对于每一页,有两个xml文件,一个包含html标签,一个包含高级元数据。

因此,检查是否有html_content可用,在这种情况下,使用Langchain的HTMLHeaderTextSplitter。

该模块根据提供的分块定义(在本例中为header_to_split_on),将文档分成较小的块。

from langchain_text_splitters import HTMLHeaderTextSplitter
headers_to_split_on = [
   ("h1", "Header 1"),
   ("h2", "Header 2"),
   ("h3", "Header 3"),
   ("h4", "Header 4"),
   ("h5", "Header 5"),
   ("h6", "Header 6")
]

html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
html_header_splits = html_splitter.split_text(modified_html)

然而,我注意到即使分块后,块的大小仍然太大。深入研究后,我发现<p>下的子标题标记在下,但可以用作子标题。

因此,下一步的自适应策略是进入数据中查找这种模式,并将标签更改为标题,从而定义新的标题,并将块分成更小的部分,同时保持语义。

# Find all <p> tags containing <strong> tags
paragraphs_with_strong = html_soup.find_all(lambda tag: tag.name == 'p' and tag.strong)

# Iterate through each <p> tag containing <strong> tags
for p_tag in paragraphs_with_strong:
   strong_tag = p_tag.strong
   if strong_tag:
       strong_text = strong_tag.text.strip()
       # Check if the text starts with a digit followed by a period and a space
       if re.match(r'^\d+\. ', strong_text):
           # Find the previous sibling header tag
           previous_header = p_tag.find_previous_sibling(lambda tag: tag.name.startswith('h') and len(tag.name) == 2)
           if previous_header:
               # Determine the level of the next header
               next_header_level = 'h6'
               # Replace the <p> tag with the next level header
               new_header_tag = html_soup.new_tag(next_header_level)
               new_header_tag.string = strong_text  # Retain the text content within <strong>
               p_tag.replace_with(new_header_tag)

一旦完成(由于文件数量庞大,这花费了一个多小时),我添加了几个属性,如文件名、令牌大小和指示文件第n个块的索引。

我的目标是追溯到文件,以便我可以提取RAG模型选择的块的文件名和其他元数据。

一旦这些处理完成,就将其存储为pickle文件,因为这些文件存储在临时文件夹中,一旦内核关闭,这些文件将被删除。

将文本分成块

预处理完成后,我注意到有些文件没有分割成标签,因为langchain工具无法识别它们为标题。

因此,下一步是通过编写代码将页面分成不同的块。

def extract_headers_and_content(html):
   html_soup = BeautifulSoup(html, 'html.parser')
   headers = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']
   result = []
   current_metadata = {}

   for tag in html_soup.find_all(headers):
       # Update the current metadata hierarchy based on the current tag
       level = headers.index(tag.name)
       current_metadata = {h: current_metadata[h] for h in headers[:level] if h in current_metadata}
       current_metadata[tag.name] = tag.get_text(strip=True)
       
       # Collect the content under the current header
       content = tag.find_next_sibling()
       content_text = ""
       while content and content.name not in headers:
           content_text += content.get_text(strip=True) + " "
           content = content.find_next_sibling()

       if content_text.strip():
           result.append({'page_content': content_text.strip(), 'metadata': current_metadata.copy()})

   return result

exception_file_chunking = []
for key, value in tqdm(full_doc_v2.items()):
   extracted_data = extract_headers_and_content(value)
   extracted_data_v2 = [i for i in extracted_data if i['page_content'] != '']
   for idx, i in enumerate(extracted_data_v2):
       i['metadata']['source'] = key
       tokens = llm.get_num_tokens(i['page_content'])
       i['metadata']['token_count'] = tokens
       i['metadata']['index'] = idx
   exception_file_chunking.append(extracted_data_v2)

上述代码手动提取元数据,查找标题,并根据下一个兄弟节点(相同的下一个标题),仅提取该特定标签的信息。

这样,总块数超过了200万个。

将标签命名为字符串

下一步是将标签重命名为字符串。

#Renaming the keys in the dictionary

for k, v in tqdm(tags.items()):
   new_dict = {}
   for key, value in v.items():
       if key == '/Segmente/Segment':
           continue
       parts = key.rsplit('/', 2)
       text = '_'.join([part.lower() for part in parts[-2:]])
       new_dict[text] = value
   tags[k] = new_dict

文本向量化

最后一步调用Hugging Face的嵌入模型进行向量化。具体操作如下:

from langchain.embeddings import HuggingFaceEmbeddings

model_name = "intfloat/multilingual-e5-large"
model_kwargs = {'device': 'cuda'}
encode_kwargs = {'normalize_embeddings': False}
hf = HuggingFaceEmbeddings(
   model_name=model_name,
   model_kwargs=model_kwargs,
   encode_kwargs=encode_kwargs
)

创建客户端

接下来是创建一个Elastic Search客户端,用于与 Elastic Search 服务进行交互。

from elasticsearch import Elasticsearch
from langchain_elasticsearch import ElasticsearchStore

# Create a custom Elasticsearch client
es_client = Elasticsearch(
   es_url,
   basic_auth=(es_user, es_password),
   verify_certs=False,
   request_timeout = 1200
)

elastic_vector_search = ElasticsearchStore(
   index_name="index_e5_multiprocessing_htmlsplitter_gpu_test_v2",
   es_connection=es_client,
   embedding=hf,
)

db = ElasticsearchStore.from_documents(
  documents=texts,
   embedding=hf,
   index_name="index_e5_multiprocessing-v6",
   strategy=ElasticsearchStore.ApproxRetrievalStrategy(
       hybrid = True
   ),
   es_connection=es_client,
)

至此,最终处理和分块完成。

我们使用BERT Score和余弦相似度来获取评估指标。

结果显示,平均BERT F1得分接近0.75,而余弦相似度接近0.9。表明语义准确性和相关性都很高。

结 论

为超过10万份文件构建RAG模型是一项具有挑战性但回报丰厚的工作。

在整个项目中,我们发现单一的分块方法并不适用。这促使我们开发了定制的分块策略,以高效处理文件。

通过提取元数据、自动删除重复文件以及从各种标签中细致地提取文本,我们确保了处理数据的质量。

结果令人鼓舞,BERT得分为0.75,余弦相似度为0.9。语义准确性和相关性都很高。

这些文件通过弹性混合搜索,从而有效地检索并提升了RAG模型的性能。


整个过程表明处理大规模数据时适应性和创新的重要性。

类似项目可以参考,释放庞大且多样的数据集的潜力,从而获得有意义的洞察。


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询

扫码登录
登录即表示您同意《53AI网站服务协议》
服务协议

欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。

在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。

一、 定义

本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。

会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。

知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。

二、 账号注册与登录

登录方式:本网站支持以下登录方式,您可根据实际情况选择:

微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。

手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。

账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。

实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。

未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。

三、 服务内容与规范

知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。

服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。

禁止行为:您在使用服务时不得实施以下行为:

利用技术手段批量爬取、下载、转存知识库内容;

将知识库内容用于商业目的或未经授权地向第三方传播;

干扰本网站正常运行或侵犯其他用户合法权益;

发布违法违规信息或从事违反公序良俗的活动。

四、 知识产权声明

权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。

有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。

侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。

五、 个人信息保护

我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。

您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。

您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。

六、 免责声明

内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。

不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。

第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。

七、 违约责任

如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。

如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。

八、 法律适用与争议解决

本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。

因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。

九、 其他

本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。

本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。

我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。


已查阅