2026年7月9日 周四晚上19:30,报名腾讯会议了解“如何构建自进化的动态知识库(Brain)”(限30人)
免费POC, 零成本试错
FDE知识库

FDE知识库

学习大模型的前沿技术与行业落地应用


收藏

使用大语言模型从零构建知识图谱(中)

发布日期:2025-01-23 21:25:13 浏览次数: 3446
作者:智见AGI

微信搜一搜,关注“智见AGI”

推荐语

这是关于大语言模型构建知识图谱的实用指南,不容错过!

核心内容:
1. 自定义流程构建知识图谱的方法
2. 不同方案的性能对比与选择
3. 本地计算机配置对模型选择的影响

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家


通过创建一个自定义流程来自动上传业务数据


在这一节,我会带你创建一个自定义流程,通过大语言模型自动生成节点定义、关系和 Cypher 查询,基于数据集进行操作。这种方法也适用于其他 DataFrame,同时该方法也能够自动识别其 Schema。


需要注意的是,这种方法在性能上会是个问题,尤其是与 Langchain 的 LLMGraphTransformer 相比,我将在下一节中进行介绍。而本节主要帮助你理解如果从零开始构建该过程,从原理出发,帮助你有机会设计自己的 Graph-Builder。实际上,目前所谓最佳方法的主要限制来自于它对数据的天然含义和模式高度敏感。因此,需要跳出固有的思维模式就显得至关重要,这样才能够帮助你从零开始设计 GraphRAG,或利用现有的,最佳实践的 GraphRAG 来满足你的业务需求。


现在,让我们深入研究,设置我们将在接下来的练习中使用的大语言模型。你可以使用 Langchain 所支持的任何大语言模型,只要其性能能够满足你真是的业务需要。


这里我们有两个可选的免费方案:DeepSeek-V3(注册后可获得 10 元的额度,有效期一个月)和 Ollama(可以让你轻松的在本地运行开源模型)。对于这两种方案我都进行了测试,尽管 DeepSeek-V3 提供了和 GPT-4o 类似的性能,我仍然推荐你选择 Ollama 进行学习,这样,你可以更深入的了解从模型下载到运行的整个过程。


在 Ollama 示例中,我们将使用 Qwen2.5-Coder:7B,它针对代码任务进行了微调,并在代码生成、推理和修复代码错误方面表现出色。根据你本地计算机的配置来决定是否使用更高参数量的版本,如 14B 或 32B。


让我们从初始化模型开始:


llm = OllamaLLM(model="qwen2.5-coder:latest")


让我们开始提取数据集的结构,并定义节点及其属性:


node_structure = "\n".join([    f"{col}: {', '.join(map(str, movies[col].unique()[:3]))}..."     for col in movies.columns])print(node_structure)


对于数据集中的每一列(例如:电影类型、导演),我们来展示一些样本值。这将帮助大语言模型理解数据格式以及每一列的典型值。


Release Year: 1907, 1908, 1909...Title: Daniel boone, Laughing gas, The adventures of dollie...Origin/Ethnicity: American...Director: Wallace mccutcheon and ediwin s. porter, Edwin stanton porter, D. w. griffith...Cast: William craven, florence lawrence, Bertha regustus, edward boulden, Arthur v. johnson, linda arvidson...Genre: Biographical, Comedy, Drama...Plot: Boone's daughter befriends an indian maiden as boone and his companion start out on a hunting expedition. while he is away, boone's cabin is attacked by the indians, who set it on fire and abduct boone's daughter. boone returns, swears vengeance, then heads out on the trail to the indian camp. his daughter escapes but is chased. the indians encounter boone, which sets off a huge fight on the edge of a cliff. a burning arrow gets shot into the indian camp. boone gets tied to the stake and tortured. the burning arrow sets the indian camp on fire, causing panic. boone is rescued by his horse, and boone has a knife fight in which he kills the indian chief.[2], The plot is that of a black woman going to the dentist for a toothache and being given laughing gas. on her way walking home, and in other situations, she can't stop laughing, and everyone she meets "catches" the laughter from her, including a vendor and police officers., On a beautiful summer day a father and mother take their daughter dollie on an outing to the river. the mother refuses to buy a gypsy's wares. the gypsy tries to rob the mother, but the father drives him off. the gypsy returns to the camp and devises a plan. they return and kidnap dollie while her parents are distracted. a rescue crew is organized, but the gypsy takes dollie to his camp. they gag dollie and hide her in a barrel before the rescue party gets to the camp. once they leave the gypsies and escapes in their wagon. as the wagon crosses the river, the barrel falls into the water. still sealed in the barrel, dollie is swept downstream in dangerous currents. a boy who is fishing in the river finds the barrel, and dollie is reunited safely with her parents...


生成节点


接下来,我们使用大语言模型的提示词模板来引导模型如何提取节点及其属性。让我们先看看完整的代码:


# 设置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
def validate_node_definition(node_def: Dict) -> bool:    """验证节点结构定义"""    if not isinstance(node_def, dict):        return False    return all(        isinstance(v, dict) and all(isinstance(k, str) for k in v.keys())        for v in node_def.values()    )
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def get_node_definitions(chain, structure: str, example: Dict) -> Dict[str, Dict[str, str]]:    """使用重试逻辑来获取节点定义"""    try:        # 从大语言模型获得响应        response = chain.invoke({"structure": structure, "example": example})                # 解析响应        node_defs = ast.literal_eval(response)                # 验证结构        if not validate_node_definition(node_defs):            raise ValueError("无效的节点结构定")                    return node_defs            except (ValueError, SyntaxError) as e:        logger.error(f"解析节点定义时出错: {e}")        raise
# 更新节点定义模板node_example = {    "NodeLabel1": {"property1": "row['property1']", "property2": "row['property2']"},    "NodeLabel2": {"property1": "row['property1']", "property2": "row['property2']"},    "NodeLabel3": {"property1": "row['property1']", "property2": "row['property2']"},}
define_nodes_prompt = PromptTemplate(    input_variables=["example", "structure"],    template=("""        分析以下数据集结构并提取节点的实体标签及其属性。\n        节点属性应基于数据集列和它们的值。\n        返回的结果应为一个字典,其中键是节点标签,值是节点属性。\n\n        示例: {example}\n\n                数据集结构:\n{structure}\n\n                      确保包括所有可能的节点标签及其属性。\n        如果某个属性可以是其自己的节点,请将其作为单独的节点标签。\n        请不要使用三重反引号标识代码块,只需返回元组的列表。\n        仅返回包含节点标签和属性的字典,不要包含任何其他文本或引号。        """    ),)
# 带有错误处理机制的执行过程try:    node_chain = define_nodes_prompt | llm
   node_definitions = get_node_definitions(node_chain, structure=node_structure, example=node_example)    logger.info(f"节点定义: {node_definitions}")except Exception as e:    logger.error(f"获取节点定义失败: {e}")    raise


在这个代码片段中,我们首先使用 logging 库设置日志记录, logging 是一个 Python 模块,用于跟踪执行过程中的事件(如错误或状态更新):


logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)


我们使用 basicConfig 配置日志记录,以显示 INFO 级别或更高的消息,并初始化日志记录器实例,我将在代码中用它来记录消息。


这个步骤其实不是必需的,你也可以用 print 语句来代替它。然而,这是一个良好的工程实践。


接下来,我将创建一个函数来验证大语言模型生成的节点:


def validate_node_definition(node_def: Dict) -> bool:    """验证节点结构定义"""    if not isinstance(node_def, dict):        return False    return all(        isinstance(v, dict) and all(isinstance(k, str) for k in v.keys())        for v in node_def.values()    )


该函数的输入是一个字典,其中键是节点标签(例如:Movie),值是属性的字典(例如:title、year)。


首先,函数检查 node_def 是否是一个字典,并验证字典中的每个值是否也是字典,并且这些字典中的所有键是否都是字符串。如果结构有效,则返回 True 。


接下来,创建一个函数来调用 LLM 链并实际生成节点:


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def get_node_definitions(chain, structure: str, example: Dict) -> Dict[str, Dict[str, str]]:    """获取带有重试逻辑的节点定义"""    try:        # 从大语言模型获取响应        response = chain.invoke({"structure": structure, "example": example})                # 解析响应        node_defs = ast.literal_eval(response)                # 验证结构        if not validate_node_definition(node_defs):            raise ValueError("无效的节点结构定义")                    return node_defs


如果你不熟悉 Python 中的装饰器,可能会好奇 @retry(...) 这部分是做什么的,可以将其看作是一个包装函数,围绕着实际的 get_node_definitions 函数。在这种情况下,我调用了 retry 装饰器,如果发生错误,它会自动重试该函数。


● stop_after_attempt(3) : 最多重试 3 次。


● wait_exponential : 在重试之间增加延迟的时长(例如:4 秒、8 秒、16 秒等等)。

函数的输入是:


●chain : LangChain 管道(提示 + LLM)。我会在稍后定义这个管道。


●structure : 数据集结构(列和示例值)。


● example : 用于引导 LLM 的示例节点定义。


接下来,chain.invoke 将结构和示例发送给 LLM,并接收一个字符串作为响应。ast.literal_eval 将字符串响应转换成 Python 字典。


我使用 validate_node_definition 检查解析后的字典是否符合正确的格式,如果结构无效,它会引发 ValueError 。


except (ValueError, SyntaxError) as e:    logger.error(f"Error parsing node definitions: {e}")    raise


如果响应无法解析或验证,会记录错误信息,该函数会抛出异常。


接下来,我们为 LLM 提供一个提示词模板,以引导其完成节点生成任务:


define_nodes_prompt = PromptTemplate(    input_variables=["example", "structure"],    template=("""        分析以下数据集结构并提取节点的实体标签及其属性。\n        节点属性应基于数据集列和它们的值。\n        返回的结果应为一个字典,其中键是节点标签,值是节点属性。\n\n        示例: {example}\n\n                数据集结构:\n{structure}\n\n                      确保包括所有可能的节点标签及其属性。\n        如果某个属性可以是其自己的节点,请将其作为单独的节点标签。\n        请不要使用三重反引号标识代码块,只需返回元组的列表。\n        仅返回包含节点标签和属性的字典,不要包含任何其他文本或引号。    """),)


请注意,我提供了本节开始时定义的节点结构,以及如何生成节点字典的示例:


node_example = {    "NodeLabel1": {"property1": "row['property1']", "property2": "row['property2']"},    "NodeLabel2": {"property1": "row['property1']", "property2": "row['property2']"},    "NodeLabel3": {"property1": "row['property1']", "property2": "row['property2']"},}


在示例中,键是节点标签(例如:Movie、Director),值是映射到数据集列的属性字典(例如:row[’property1’] )。


接下来,让我们执行链:


try:    node_chain = define_nodes_prompt | llm    node_definitions = get_node_definitions(node_chain, structure=node_structure, example=node_example)    logger.info(f"节点定义: {node_definitions}")except Exception as e:    logger.error(f"获取节点定义失败: {e}")    raise


在 LangChain 中,我们使用结构化提示词 | LLM | … 来创建一个链,将提示词模板与 LLM 结合,形成一个管道。我们使用 get_node_definitions 来获取并验证节点定义。

如果过程中出现失败,错误会被记录,并且程序会引发异常。


如果过程成功,它将生成类似于以下内容的结果:


INFO:__main__:Node Definitions: {'Movie': {'Release Year': "row['Release Year']", 'Title': "row['Title']"}, 'Director': {'Name': "row['Director']"}, 'Cast': {'Actor': "row['Cast']"}, 'Genre': {'Type': "row['Genre']"}, 'Plot': {'Description': "row['Plot']"}}


生成关系


一旦节点被定义,我们就可以识别它们之间的关系。接下来,我们来看看完整的代码是怎样的:


class RelationshipIdentifier:    """识别图数据库中节点之间的关系。"""        RELATIONSHIP_EXAMPLE = [        ("NodeLabel1", "RelationshipLabel", "NodeLabel2"),        ("NodeLabel1", "RelationshipLabel", "NodeLabel3"),        ("NodeLabel2", "RelationshipLabel", "NodeLabel3"),    ]
   PROMPT_TEMPLATE = PromptTemplate(    input_variables=["structure", "node_definitions", "example"],    template="""        考虑以下数据集结构:\n{structure}\n\n
       考虑以下节点定义:\n{node_definitions}\n\n
       根据数据集结构和节点定义,识别节点之间的关系(边)。\n        以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。\n        请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。\n\n
       示例:\n{example}        """)
   def __init__(self, llm: Any, logger: logging.Logger = None):        self.llm = llm        self.logger = logger or logging.getLogger(__name__)        self.chain = self.PROMPT_TEMPLATE | self.llm
   def validate_relationships(self, relationships: List[Tuple]) -> bool:        """验证关系结构"""        return all(            isinstance(rel, tuple) and            len(rel) == 3 and            all(isinstance(x, str) for x in rel)            for rel in relationships        )
   @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))    def identify_relationships(self, structure: str, node_definitions: Dict) -> List[Tuple]:        """识别关系并应用重试逻辑"""        try:            response = self.chain.invoke({                "structure": structure,                "node_definitions": str(node_definitions),                "example": str(self.RELATIONSHIP_EXAMPLE)            })                        relationships = ast.literal_eval(response)                        if not self.validate_relationships(relationships):                raise ValueError("无效的关系结构")                            self.logger.info(f"已验证 {len(relationships)} 个关系")            return relationships                    except Exception as e:            self.logger.error(f"验证关系时出现错误:{e}")            raise
   def get_relationship_types(self) -> List[str]:        """提取唯一的关系类型。"""        return list(set(rel[1] for rel in self.identify_relationships()))
# 用法identifier = RelationshipIdentifier(llm=llm)relationships = identifier.identify_relationships(node_structure, node_definitions)print("关系:", relationships)


由于这段代码需要进行比节点生成更多的操作,我们将代码组织在一个类中 —— RelationshipIdentifier —— 以封装所有关系提取、验证和日志记录的逻辑。我们使用类似的逻辑,因此我们提供一个关系示例:


RELATIONSHIP_EXAMPLE = [    ("NodeLabel1", "RelationshipLabel", "NodeLabel2"),    ("NodeLabel1", "RelationshipLabel", "NodeLabel3"),    ("NodeLabel2", "RelationshipLabel", "NodeLabel3"),]


在这里,每个关系都是一个元组,包含以下内容:


●起始节点标签:源节点的标签(例如:Movie)。


●关系标签:连接类型(例如:DIRECTED_BY)。


●结束节点标签:目标节点的标签(例如:Director)。


接下来,我们定义实际的提示词模板:


PROMPT_TEMPLATE = PromptTemplate(    input_variables=["structure", "node_definitions", "example"],    template="""        考虑以下数据集结构:\n{structure}\n\n
       考虑以下节点定义:\n{node_definitions}\n\n
       根据数据集结构和节点定义,识别节点之间的关系(边)。\n        以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。\n        请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。\n\n
       示例:\n{example}        """)


在这种情况下,我们有三个输入变量:


●structure:数据集结构,列出了列和示例值。我在本节开始时定义了它。


● node_definitions :节点标签及其属性的字典。这些节点是在上一节中由 LLM 生成的。


● example :三元组格式的示例关系。

接下来,我将使用者三个属性初始化类:


def __init__(self, llm: Any, logger: logging.Logger = None):    self.llm = llm    self.logger = logger or logging.getLogger(__name__)    self.chain = self.PROMPT_TEMPLATE | self.llm


●llm :用于处理提示的语言模型(例如:GPT-4o-mini)。


● logger :可选参数,用于记录进度和错误(如果未提供,则默认为标准日志记录器)。


●self.chain :将提示词模板与 LLM 结合,创建一个可重用的管道。


类似之前的做法,我们创建一个方法来验证生成的关系:


def validate_relationships(self, relationships: List[Tuple]) -> bool:    """验证关系结构。"""    return all(        isinstance(rel, tuple) and         len(rel) == 3 and         all(isinstance(x, str) for x in rel)        for rel in relationships    )


该方法检查每个项目是否是元组,确保每个元组包含三个元素,并且所有元素都是字符串(例如:节点标签或关系类型)。最后,如果满足这些条件,则返回 TRUE ,否则返回 FALSE 。


接下来,我们创建一个方法来调用链并生成关系:


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def identify_relationships(self, structure: str, node_definitions: Dict) -> List[Tuple]:    """识别关系并应用重试逻辑。"""    try:        response = self.chain.invoke({            "structure": structure,             "node_definitions": str(node_definitions),             "example": str(self.RELATIONSHIP_EXAMPLE)        })                relationships = ast.literal_eval(response)                if not self.validate_relationships(relationships):            raise ValueError("无效的关系结构")                    self.logger.info(f"已验证 {len(relationships)} 个关系")        return relationships


我们再次使用 retry 装饰器来在失败时重新尝试 LLM 链,并以类似于节点生成时的方式调用链。


此外,我们使用 ast.literal_eval 将 LLM 的字符串输出转换成 Python 列表,并使用 validate_relationships 来确保输出格式正确。


except Exception as e:    self.logger.error(f"Error identifying relationships: {e}")    raise


如果该方法失败,它会记录错误并最多重试 3 次。


最后一个方法返回唯一的关系标签(例如:DIRECTED_BY、ACTED_IN):


def get_relationship_types(self) -> List[str]:    """Extract unique relationship types."""    return list(set(rel[1] for rel in self.identify_relationships()))


它调用 identify_relationships 方法来获取关系列表。然后,它提取每个元组中的第二个元素(关系标签),使用 set 来去除重复项,并将结果转换回列表。


现在,终于到了生成关系的时候了:


identifier = RelationshipIdentifier(llm=llm)relationships = identifier.identify_relationships(node_structure, node_definitions)print("Relationships:", relationships)


如果 LLM 在 3 次尝试内成功,它将返回一个类似以下内容的关系列表,以元组格式表示:


INFO:__main__:Identified 4 relationshipsRelationships: [('Movie', 'Directed By', 'Director'), ('Movie', 'Starring', 'Cast'), ('Movie', 'Has Genre', 'Genre'), ('Movie', 'Contains Plot', 'Plot')]


生成 Cypher 查询


在节点和关系定义完成后,我创建了 Cypher 查询将它们加载到 Neo4j 中。这个过程遵循与节点生成和关系生成类似的逻辑。然而,我们增加了几个额外的步骤来进行验证,因为生成的输出将用于将数据加载到我们的知识图谱中。因此,我们需要尽可能提高成功的概率。让我们首先看看完整的代码:


class CypherQueryBuilder:    """构建用于 Neo4j 图数据库的 Cypher 查询。"""
   INPUT_EXAMPLE = """    NodeLabel1: value1, value2    NodeLabel2: value1, value2    """        EXAMPLE_CYPHER = example_cypher = """    CREATE (n1:NodeLabel1 {property1: "row['property1']", property2: "row['property2']"})    CREATE (n2:NodeLabel2 {property1: "row['property1']", property2: "row['property2']"})    CREATE (n1)-[:RelationshipLabel]->(n2);    """
   PROMPT_TEMPLATE = PromptTemplate(    input_variables=["structure", "node_definitions", "relationships", "example"],    template="""        考虑以下节点定义:\n{node_definitions}\n\n        考虑以下关系:\n{relationships}\n\n        生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。\n        包括每个节点的所有属性,按照节点定义,并创建关系。\n        返回一个包含每个查询用分号分隔的单个字符串。\n        请不要在响应中包含任何其他文本或引号。\n        请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。\n\n
       示例输入:\n{input}\n\n
       示例输出Cypher查询:\n{cypher}    """)
   def __init__(self, llm: Any, logger: logging.Logger = None):        self.llm = llm        self.logger = logger or logging.getLogger(__name__)        # self.chain = LLMChain(llm=llm, prompt=self.PROMPT_TEMPLATE)        self.chain = self.PROMPT_TEMPLATE | self.llm
   def validate_cypher_query(self, query: str) -> bool:        """使用 LLM 和正则表达式模式验证 Cypher 查询语法。"""                VALIDATION_PROMPT = PromptTemplate(            input_variables=["query"],            template="""            验证此Cypher查询并返回 TRUE 或 FALSE:                        查询: {query}                        检查规则:            1. 有效的 CREATE 语句            2. 正确的属性格式            3. 有效的关系语法            4. 无缺失的括号            5. 有效的属性名称            6. 有效的关系类型                        如果查询有效,返回 TRUE;如果无效,返回 FALSE。            """        )                try:            # 基本模式验证            basic_valid = all(re.search(pattern, query) for pattern in [                r'CREATE \(',                  r'\{.*?\}',                    r'\)-\[:.*?\]->'            ])                        if not basic_valid:                return False                            #  LLM 验证            validation_chain = VALIDATION_PROMPT | self.llm            result = validation_chain.invoke({"query": query})                        # 解析结果            is_valid = "TRUE" in result.upper()                        if not is_valid:                self.logger.warning(f"LLM 验证查询失败: {query}")                            return is_valid                    except Exception as e:            self.logger.error(f"验证错误: {e}")            return False
   def sanitize_query(self, query: str) -> str:        """清理并格式化 Cypher 查询"""        return (query                .strip()                .replace('\n', ' ')                .replace('  ', ' ')                .replace("'row[", "row['")                .replace("]'", "']"))
   @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))    def build_queries(self, node_definitions: Dict, relationships: List) -> str:        """构建带有重试逻辑的 Cypher 查询。"""        try:            response = self.chain.invoke({                "node_definitions": str(node_definitions),                "relationships": str(relationships),                "input": self.INPUT_EXAMPLE,                "cypher": self.EXAMPLE_CYPHER            })
           # 获取位于三重反引号内的响应。            if '```' in response:                response = response.split('```')[1]
                       # 清理响应            queries = self.sanitize_query(response)                        # 验证查询            if not self.validate_cypher_query(queries):                raise ValueError("无效的 Cypher 查询语法")                            self.logger.info("成功生成 Cypher 查询")            return queries                    except Exception as e:            self.logger.error(f"构建 Cypher 查询出错: {e}")            raise
   def split_queries(self, queries: str) -> List[str]:        """将组合的查询拆分为单独的语句。"""        return [q.strip() for q in queries.split(';') if q.strip()]
# 用法builder = CypherQueryBuilder(llm=llm)cypher_queries = builder.build_queries(node_definitions, relationships)print("Cypher 查询:", cypher_queries)


我们提供一个提示词模板来帮助 LLM:


PROMPT_TEMPLATE = PromptTemplate(    input_variables=["structure", "node_definitions", "relationships", "example"],    template="""        考虑以下节点定义:\n{node_definitions}\n\n        考虑以下关系:\n{relationships}\n\n        生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。\n        包括每个节点的所有属性,按照节点定义,并创建关系。\n        返回一个包含每个查询用分号分隔的单个字符串。\n        请不要在响应中包含任何其他文本或引号。\n        请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。\n\n
       示例输入:\n{input}\n\n
       示例输出Cypher查询:\n{cypher}    """)


现在,我提供了四个变量给提示词模板:


● structure :数据集结构,作为上下文。

●node_definitions :生成的节点及其属性。

●relationships :节点之间生成的关系。

●example : 用于格式参考的示例查询。


def __init__(self, llm: Any, logger: logging.Logger = None):    self.llm = llm    self.logger = logger or logging.getLogger(__name__)    self.chain = self.PROMPT_TEMPLATE | self.llm


我们以与关系类相同的方式初始化该类。


接下来,我定义了一个验证方法来检查生成的输出:


def validate_cypher_query(self, query: str) -> bool:    """使用 LLM 和正则表达式模式验证 Cypher 查询语法。"""        VALIDATION_PROMPT = PromptTemplate(        input_variables=["query"],        template="""        验证此Cypher查询并返回 TRUE 或 FALSE:                        查询: {query}                        检查规则:            1. 有效的 CREATE 语句            2. 正确的属性格式            3. 有效的关系语法            4. 无缺失的括号            5. 有效的属性名称            6. 有效的关系类型                        如果查询有效,返回 TRUE;如果无效,返回 FALSE。        """    )        try:        # 基本模式验证        basic_valid = all(re.search(pattern, query) for pattern in [            r'CREATE \(',              r'\{.*?\}',                r'\)-\[:.*?\]->'        ])                if not basic_valid:            return False                    # LLM 验证        validation_chain = VALIDATION_PROMPT | self.llm        result = validation_chain.invoke({"query": query})                # 解析结果        is_valid = "TRUE" in result.upper()                if not is_valid:            self.logger.warning(f"LLM 验证查询失败: {query}")                    return is_valid            except Exception as e:        self.logger.error(f"验证错误:{e}")        return False


该方法执行两个验证步骤。首先是使用正则表达式进行基本验证:


basic_valid = all(re.search(pattern, query) for pattern in [    r'CREATE \(',      r'\{.*?\}',        r'\)-\[:.*?\]->'])if not basic_valid:    return False


这确保查询包含必要的 Cypher 语法:


●CREATE :确保节点和关系正在被创建。

●  {.*?} :确保包含属性。

● -: .*?→ :确保关系格式正确。


然后,它使用 LLM 执行高级验证:


validation_chain = VALIDATION_PROMPT | self.llmresult = validation_chain.invoke({"query": query})is_valid = "TRUE" in result.upper()


验证在提示中指定,我要求 LLM 确保以下几点:


1. 有效的 CREATE 语句

2. 正确的属性格式

3. 有效的关系语法

4. 无缺失的括号

5.有效的属性名称

6.有效的关系类型


到目前为止一切看上去工作的都还不错,这里,让我再添加一个方法,进一步清理生成的输出:


def sanitize_query(self, query: str) -> str:    """清理并格式化 Cypher 查询。"""    return (query            .strip()            .replace('\n', ' ')            .replace('  ', ' ')            .replace("'row[", "row['")            .replace("]'", "']"))


我将移除不必要的空格以及换行符(\n),并修复与数据集引用相关的潜在格式问题(例如:row[’property1’])。


请根据你所使用的大语言模型考虑更新此方法,较小参数量的模型可能需要更多的数据清理操作。


接下来,我来定义一个查询调用方法:


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))    def build_queries(self, node_definitions: Dict, relationships: List) -> str:        """构建带有重试逻辑的 Cypher 查询。"""        try:            response = self.chain.invoke({                "node_definitions": str(node_definitions),                "relationships": str(relationships),                "input": self.INPUT_EXAMPLE,                "cypher": self.EXAMPLE_CYPHER            })
           # 获取位于三重反引号内的响应            if '```' in response:                response = response.split('```')[1]
                       # 清理响应            queries = self.sanitize_query(response)                        # 验证查询            if not self.validate_cypher_query(queries):                raise ValueError("无效的 Cypher 查询语法")                            self.logger.info("成功生成 Cypher 查询")            return queries                    except Exception as e:            self.logger.error(f"构建 Cypher 查询时出错: {e}")            raise


这个方法与关系构建器类中的方法类似,唯一的不同之处是:


if '```' in response:    response = response.split('```')[1]


在这里,LLM 可能会提供额外的 Markdown 格式来指定它是一个代码块。如果 LLM 的响应中存在这种格式,我只会提取三重反引号内的代码。


接下来,我定义一个方法,将单一的 Cypher 查询字符串拆分成单独的语句:


def split_queries(self, queries: str) -> List[str]:    """将组合的查询拆分为单独的语句"""    return [q.strip() for q in queries.split(';') if q.strip()]


例如,以下 Cypher 查询:


CREATE (n1:Movie {title: "Inception"}); CREATE (n2:Director {name: "Nolan"});


这将转换成以下形式:


["CREATE (n1:Movie {title: 'Inception'})", "CREATE (n2:Director {name: 'Nolan'})"]


这将非常有用,因为可以遍历查询列表。


最后,初始化类并生成 Cypher 查询:


builder = CypherQueryBuilder(llm=llm)cypher_queries = builder.build_queries(node_definitions, relationships)print("Cypher 查询:", cypher_queries)


成功时,输出将如下所示:


INFO:__main__:Successfully generated Cypher queriesCypher Queries: CREATE (m:Movie {Release_Year: "row['Release Year']", Title: "row['Title']"}) CREATE (d:Director {Name: "row['Director']"}) CREATE (c:Cast {Actor: "row['Cast']"}) CREATE (g:Genre {Type: "row['Genre']"}) CREATE (p:Plot {Description: "row['Plot']"}) CREATE (m)-[:Directed_By]->(d) CREATE (m)-[:Starring]->(c) CREATE (m)-[:Has_Genre]->(g) CREATE (m)-[:Contains_Plot]->(p)


最后,遍历数据集,并为每一行执行生成的 Cypher 查询。


logs = ""total_rows = len(df)
def sanitize_value(value):    if isinstance(value, str):        return value.replace('"', '')    return str(value)
for index, row in tqdm(df.iterrows(),                      total=total_rows,                      desc="正在加载数据到 Neo4j",                      position=0,                      leave=True):        # 将占位符替换为实际的值    cypher_query = cypher_queries    for column in df.columns:        cypher_query = cypher_query.replace(            f"row['{column}']",            f'{sanitize_value(row[column])}'        )        try:        # 执行查询并更新进度        conn.execute_query(cypher_query)    except Exception as e:        logs += f"在行 {index+1}: {str(e)} 出现错误\n"


请注意,我定义了一个空字符串变量 logs ,用于捕获潜在的失败。我还添加了一个清理函数,用于传递给每个行输入的值:


def sanitize_value(value):    if isinstance(value, str):        return value.replace('"', '')    return str(value)


将防止包含双引号的字符串破坏查询语法。


接下来,我们来遍历数据集:


for index, row in tqdm(df.iterrows(),                       total=total_rows,                      desc="正在加载数据到 Neo4j",                      position=0,                      leave=True):        # 将占位符替换为实际的值    cypher_query = cypher_queries    for column in df.columns:        cypher_query = cypher_query.replace(            f"row['{column}']",             f'{sanitize_value(row[column])}'        )        try:        # 执行查询并更新进度        conn.execute_query(cypher_query)    except Exception as e:        logs += f"在行 {index+1}: {str(e)} 出现错误\n"


正如我在练习开始时提到的,我使用 tqdm 为进度条添加了一个漂亮的外观,以可视化的方式显示处理了多少行数据。我传递了 df.iterrows() 来遍历 DataFrame,提供索引和行数据。total=total_rows 由 tqdm 用于计算进度。添加 desc=”正在加载数据到 Neo4j” 来为进度条提供标签。最后,position=0, leave=True 确保进度条在控制台中保持可见。


接下来,我将像 row[’column_name’] 这样的占位符替换成实际的数据集值,将每个值传递给 sanitize_value 函数,并执行查询。


让我们检查一下数据集是否已上传。切换到 Neo4j,并运行以下 Cypher 查询:


MATCH p=(m:Movie)-[r]-(n)RETURN pLIMIT 100;


在我的机器上,LLM 生成了以下图表:



这与我们手动上传的知识图谱非常相似。对于一个简单的 LLM 来说,这还不赖对吧。虽然这需要相当多的编码工作,但我们现在可以将其重用于多个数据集,更重要的是,可以将其作为基础,创建更复杂的 LLM 图形构建器。


在我提供的示例中,还没有通过提供实体、关系和属性来帮助 LLM。然而,考虑将它们作为示例来提高 LLM 的性能。此外,更现代化的方法利用思维链来提出额外的节点和关系。这使得模型能够顺序推理并进一步改进结果。另一种策略是提供行样本,以更好的适应每行中提供的值。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询

扫码登录
登录即表示您同意《53AI网站服务协议》
服务协议

欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。

在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。

一、 定义

本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。

会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。

知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。

二、 账号注册与登录

登录方式:本网站支持以下登录方式,您可根据实际情况选择:

微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。

手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。

账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。

实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。

未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。

三、 服务内容与规范

知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。

服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。

禁止行为:您在使用服务时不得实施以下行为:

利用技术手段批量爬取、下载、转存知识库内容;

将知识库内容用于商业目的或未经授权地向第三方传播;

干扰本网站正常运行或侵犯其他用户合法权益;

发布违法违规信息或从事违反公序良俗的活动。

四、 知识产权声明

权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。

有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。

侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。

五、 个人信息保护

我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。

您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。

您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。

六、 免责声明

内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。

不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。

第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。

七、 违约责任

如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。

如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。

八、 法律适用与争议解决

本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。

因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。

九、 其他

本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。

本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。

我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。


已查阅