微信扫码
添加专属顾问
导语:最近 GraphRAG 在社区很火,作者亲自体验后,发现了一些可以探讨和改进的地方,本文主要介绍了如何改造 GraphRAG 以支持自定义的 LLM。
01
为什么在 RAG 中引入知识图谱?
相比传统的 RAG 方法,Graph RAG 在处理全局性问题时表现出更好;
02
GraphRAG 改造计划
GraphRAG 目前更像是一个 Demo 产品,想和业务结合现在也没什么可以操作的地方,肯定是需要自定义的。
这篇文章我会首先介绍下如何改造 GraphRAG 以支持自定义的 LLM,同时我把修改 GraphRAG 的代码也开源在 GitHub 上了,也欢迎感兴趣的朋友共同建设...
03
环境准备
3.1 安装依赖
git clone git@github.com:microsoft/graphrag.git
# 先安装pipx
brew install pipx
pipx ensurepath
sudo pipx ensurepath --global # optional to allow pipx actions in global scope. See "Global installation" section below.
# 安装poetry
pipx install poetry
poetry completions zsh > ~/.zfunc/_poetry
mkdir $ZSH_CUSTOM/plugins/poetry
poetry completions zsh > $ZSH_CUSTOM/plugins/poetry/_poetry
poetry install
3.2 项目结构
vector_stores 目录:包含向量数据库的实现。如果要自定义向量存储,需要在这个目录下进行实现。
3.3 运行& Debug 项目
mkdir -p ./ragtest/input
# 这一步可以随便替换成一些其他的文档,小一点的, 这样效率比较开,可以更快的验证下我们的改造结果
curl https://www.gutenberg.org/cache/epub/24022/pg24022.txt > ./ragtest/input/book.txt
初始化项目:
python -m graphrag.index --init --root ./ragtest
对文档进行索引:
python -m graphrag.index --root ./ragtest
进行本地查询:
python -m graphrag.query \
--root ./ragtest \
--method local \
"Who is Scrooge, and what are his main relationships?"
接下来填具体的参数,还有工作目录不要忘了。
04
GraphRAG 支持通义千问
4.1 修改的内容
4.2 支持 Qwen 类型的配置
4.3 使用 Qwen 进行 Index
def _load_qwen_llm(
on_error: ErrorHandlerFn,
cache: LLMCache,
config: dict[str, Any],
azure=False,
):
log.info(f"Loading Qwen completion LLM with config {config}")
return QwenCompletionLLM(config)
def _load_qwen_embeddings_llm(
on_error: ErrorHandlerFn,
cache: LLMCache,
config: dict[str, Any],
azure=False,
):
log.info(f"Loading Qwen embeddings LLM with config {config}")
return DashscopeEmbeddingsLLM(config);
通过兼容原本的方法,到这里索引部分就可以通过 Qwen 完全进行使用了。
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import json
import logging
from http import HTTPStatus
from typing import Unpack, List, Dict
import dashscope
import regex as re
from graphrag.config import LLMType
from graphrag.llm import LLMOutput
from graphrag.llm.base import BaseLLM
from graphrag.llm.base.base_llm import TIn, TOut
from graphrag.llm.types import (
CompletionInput,
CompletionOutput,
LLMInput,
)
log = logging.getLogger(__name__)
class QwenCompletionLLM(
BaseLLM[
CompletionInput,
CompletionOutput,
]
):
def __init__(self, llm_config: dict = None):
log.info(f"llm_config: {llm_config}")
self.llm_config = llm_config or {}
self.api_key = self.llm_config.get("api_key", "")
self.model = self.llm_config.get("model", dashscope.Generation.Models.qwen_turbo)
# self.chat_mode = self.llm_config.get("chat_mode", False)
self.llm_type = llm_config.get("type", LLMType.StaticResponse)
self.chat_mode = (llm_config.get("type", LLMType.StaticResponse) == LLMType.QwenChat)
async def _execute_llm(
self,
input: CompletionInput,
**kwargs: Unpack[LLMInput],
) -> CompletionOutput:
log.info(f"input: {input}")
log.info(f"kwargs: {kwargs}")
variables = kwargs.get("variables", {})
# 使用字符串替换功能替换占位符
formatted_input = replace_placeholders(input, variables)
if self.chat_mode:
history = kwargs.get("history", [])
messages = [
*history,
{"role": "user", "content": formatted_input},
]
response = self.call_with_messages(messages)
else:
response = self.call_with_prompt(formatted_input)
if response.status_code == HTTPStatus.OK:
if self.chat_mode:
return response.output["choices"][0]["message"]["content"]
else:
return response.output["text"]
else:
raise Exception(f"Error {response.code}: {response.message}")
def call_with_prompt(self, query: str):
print("call_with_prompt {}".format(query))
response = dashscope.Generation.call(
model=self.model,
prompt=query,
api_key=self.api_key
)
return response
def call_with_messages(self, messages: list[dict[str, str]]):
print("call_with_messages {}".format(messages))
response = dashscope.Generation.call(
model=self.model,
messages=messages,
api_key=self.api_key,
result_format='message',
)
return response
# 主函数
async def _invoke_json(self, input: TIn, **kwargs) -> LLMOutput[TOut]:
try:
output = await self._execute_llm(input, **kwargs)
except Exception as e:
print(f"Error executing LLM: {e}")
return LLMOutput[TOut](output=None, json=None)
# 解析output的内容
extracted_jsons = extract_json_strings(output)
if len(extracted_jsons) > 0:
json_data = extracted_jsons[0]
else:
json_data = None
try:
output_str = json.dumps(json_data)
except (TypeError, ValueError) as e:
print(f"Error serializing JSON: {e}")
output_str = None
return LLMOutput[TOut](
output=output_str,
json=json_data
)
def replace_placeholders(input_str, variables):
for key, value in variables.items():
placeholder = "{" + key + "}"
input_str = input_str.replace(placeholder, value)
return input_str
def preprocess_input(input_str):
# 预处理输入字符串,移除或转义特殊字符
return input_str.replace('<', '<').replace('>', '>')
def extract_json_strings(input_string: str) -> List[Dict]:
# 正则表达式模式,用于匹配 JSON 对象
json_pattern = re.compile(r'(\{(?:[^{}]|(?R))*\})')
# 查找所有匹配的 JSON 子字符串
matches = json_pattern.findall(input_string)
json_objects = []
for match in matches:
try:
# 尝试解析 JSON 子字符串
json_object = json.loads(match)
json_objects.append(json_object)
except json.JSONDecodeError:
# 如果解析失败,忽略此子字符串
log.warning(f"Invalid JSON string: {match}")
pass
return json_objects
实现下对应的 Embeding 模型;
"""The EmbeddingsLLM class."""
import logging
log = logging.getLogger(__name__)
from typing import Unpack
from graphrag.llm.base import BaseLLM
from graphrag.llm.types import (
EmbeddingInput,
EmbeddingOutput,
LLMInput,
)
from http import HTTPStatus
import dashscope
import logging
log = logging.getLogger(__name__)
class QwenEmbeddingsLLM(BaseLLM[EmbeddingInput, EmbeddingOutput]):
"""A text-embedding generator LLM using Dashscope's API."""
def __init__(self, llm_config: dict = None):
log.info(f"llm_config: {llm_config}")
self.llm_config = llm_config or {}
self.api_key = self.llm_config.get("api_key", "")
self.model = self.llm_config.get("model", dashscope.TextEmbedding.Models.text_embedding_v1)
async def _execute_llm(
self, input: EmbeddingInput, **kwargs: Unpack[LLMInput]
) -> EmbeddingOutput:
log.info(f"input: {input}")
response = dashscope.TextEmbedding.call(
model=self.model,
input=input,
api_key=self.api_key
)
if response.status_code == HTTPStatus.OK:
res = [embedding["embedding"] for embedding in response.output["embeddings"]]
return res
else:
raise Exception(f"Error {response.code}: {response.message}")4.4 使用 Qwen 进行 Query
query 相比 index 支持了流式的输出内容:
import asyncio
import logging
from http import HTTPStatus
from typing import Any
import dashscope
from tenacity import (
Retrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from graphrag.query.llm.base import BaseLLMCallback, BaseLLM
from graphrag.query.progress import StatusReporter, ConsoleStatusReporter
log = logging.getLogger(__name__)
class DashscopeGenerationLLM(BaseLLM):
def __init__(
self,
api_key: str | None = None,
model: str | None = None,
max_retries: int = 10,
request_timeout: float = 180.0,
retry_error_types: tuple[type[BaseException]] = (Exception,),
reporter: StatusReporter = ConsoleStatusReporter(),
):
self.api_key = api_key
self.model = model or dashscope.Generation.Models.qwen_turbo
self.max_retries = max_retries
self.request_timeout = request_timeout
self.retry_error_types = retry_error_types
self._reporter = reporter
def generate(
self,
messages: str | list[str],
streaming: bool = False,
callbacks: list[BaseLLMCallback] | None = None,
**kwargs: Any,
) -> str:
try:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries),
wait=wait_exponential_jitter(max=10),
reraise=True,
retry=retry_if_exception_type(self.retry_error_types),
)
for attempt in retryer:
with attempt:
return self._generate(
messages=messages,
streaming=streaming,
callbacks=callbacks,
**kwargs,
)
except RetryError as e:
self._reporter.error(
message="Error at generate()", details={self.__class__.__name__: str(e)}
)
return ""
else:
return ""
async def agenerate(
self,
messages: str | list[str],
streaming: bool = False,
callbacks: list[BaseLLMCallback] | None = None,
**kwargs: Any,
) -> str:
try:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries),
wait=wait_exponential_jitter(max=10),
reraise=True,
retry=retry_if_exception_type(self.retry_error_types),
)
for attempt in retryer:
with attempt:
return await asyncio.to_thread(
self._generate,
messages=messages,
streaming=streaming,
callbacks=callbacks,
**kwargs,
)
except RetryError as e:
self._reporter.error(f"Error at agenerate(): {e}")
return ""
else:
return ""
def _generate(
self,
messages: str | list[str],
streaming: bool = False,
callbacks: list[BaseLLMCallback] | None = None,
**kwargs: Any,
) -> str:
if isinstance(messages, list):
response = dashscope.Generation.call(
model=self.model,
messages=messages,
api_key=self.api_key,
stream=streaming,
incremental_output=streaming,
timeout=self.request_timeout,
result_format='message',
**kwargs,
)
else:
response = dashscope.Generation.call(
model=self.model,
prompt=messages,
api_key=self.api_key,
stream=streaming,
incremental_output=streaming,
timeout=self.request_timeout,
**kwargs,
)
# if response.status_code != HTTPStatus.OK:
# raise Exception(f"Error {response.code}: {response.message}")
if streaming:
full_response = ""
for chunk in response:
if chunk.status_code != HTTPStatus.OK:
raise Exception(f"Error {chunk.code}: {chunk.message}")
decoded_chunk = chunk.output.choices[0]['message']['content']
full_response += decoded_chunk
if callbacks:
for callback in callbacks:
callback.on_llm_new_token(decoded_chunk)
return full_response
else:
if isinstance(messages, list):
return response.output["choices"][0]["message"]["content"]
else:
return response.output["text"]
实现 Query 的 Embedding 对象:
import asyncio
import logging
from typing import Any
import dashscope
from tenacity import (
Retrying,
RetryError,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from graphrag.query.llm.base import BaseTextEmbedding
from graphrag.query.progress import StatusReporter, ConsoleStatusReporter
log = logging.getLogger(__name__)
class DashscopeEmbedding(BaseTextEmbedding):
def __init__(
self,
api_key: str | None = None,
model: str = dashscope.TextEmbedding.Models.text_embedding_v1,
max_retries: int = 10,
retry_error_types: tuple[type[BaseException]] = (Exception,),
reporter: StatusReporter = ConsoleStatusReporter(),
):
self.api_key = api_key
self.model = model
self.max_retries = max_retries
self.retry_error_types = retry_error_types
self._reporter = reporter
def embed(self, text: str, **kwargs: Any) -> list[float]:
try:
embedding = self._embed_with_retry(text, **kwargs)
return embedding
except Exception as e:
self._reporter.error(
message="Error embedding text",
details={self.__class__.__name__: str(e)},
)
return []
async def aembed(self, text: str, **kwargs: Any) -> list[float]:
try:
embedding = await asyncio.to_thread(self._embed_with_retry, text, **kwargs)
return embedding
except Exception as e:
self._reporter.error(
message="Error embedding text asynchronously",
details={self.__class__.__name__: str(e)},
)
return []
def _embed_with_retry(self, text: str, **kwargs: Any) -> list[float]:
try:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries),
wait=wait_exponential_jitter(max=10),
reraise=True,
retry=retry_if_exception_type(self.retry_error_types),
)
for attempt in retryer:
with attempt:
response = dashscope.TextEmbedding.call(
model=self.model,
input=text,
api_key=self.api_key,
**kwargs,
)
if response.status_code == 200:
embedding = response.output["embeddings"][0]["embedding"]
return embedding
else:
raise Exception(f"Error {response.code}: {response.message}")
except RetryError as e:
self._reporter.error(
message="Error at embed_with_retry()",
details={self.__class__.__name__: str(e)},
)
return []
运行下 Query 的效果:
4.5 项目中的一些关键节点
4.6 遇到错误怎么办
05
GraphRAG 的核心步骤
使用 Leiden 算法进行检测,得到层次化的社区结构,Leiden 算法帮助我们把大量的文本信息组织成有意义的群组,使得我们可以更容易地理解和处理这些信息。
对于高层社区,递归地利用子社区摘要。
06
小结
业务集成 GraphRAG
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-07-01
提升 RAG 准确率全攻略 让你的 AI 知识库 真正靠谱起来!
2026-06-30
教程:如何用AutoRAG + Milvus避免RAG 与Agent 中出现串租问题
2026-06-30
知识库不是文件堆——我把RAG准确率从60%调到了92%
2026-06-30
本体论语义建设新思路,另类RAG来解决检索问题
2026-06-30
别把RAG当架构:Ontology(本体)才是Agent的业务世界
2026-06-29
PixelRAG:伯克利团队颠覆传统 RAG,用截图代替文本检索! 28 天狂揽 3000+ Star!
2026-06-29
腾讯WeKnora开源详解(三):检索引擎与生态集成
2026-06-29
腾讯开源WeKnora详解(二):知识库与对话核心能力
2026-04-06
2026-04-27
2026-04-23
2026-04-20
2026-04-09
2026-04-12
2026-04-22
2026-04-10
2026-05-14
2026-04-30
2026-06-23
2026-06-23
2026-06-15
2026-06-10
2026-06-10
2026-05-20
2026-05-18
2026-05-11
欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。
在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。
一、 定义
本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。
会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。
知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。
二、 账号注册与登录
登录方式:本网站支持以下登录方式,您可根据实际情况选择:
微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。
手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。
账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。
实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。
未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。
三、 服务内容与规范
知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。
服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。
禁止行为:您在使用服务时不得实施以下行为:
利用技术手段批量爬取、下载、转存知识库内容;
将知识库内容用于商业目的或未经授权地向第三方传播;
干扰本网站正常运行或侵犯其他用户合法权益;
发布违法违规信息或从事违反公序良俗的活动。
四、 知识产权声明
权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。
有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。
侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。
五、 个人信息保护
我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。
您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。
您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。
六、 免责声明
内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。
不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。
第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。
七、 违约责任
如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。
如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。
八、 法律适用与争议解决
本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。
因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。
九、 其他
本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。
本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。
我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。