微信扫码
添加专属顾问
async def send(self,message: AgentMessage,recipient: Agent,reviewer: Optional[Agent] = None,request_reply: Optional[bool] = True,is_recovery: Optional[bool] = False,silent: Optional[bool] = False,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,) -> None:"""Send a message to recipient agent.Args:message(AgentMessage): the message to be sent.recipient(Agent): the recipient agent.reviewer(Agent): the reviewer agent.request_reply(bool): whether to request a reply.is_recovery(bool): whether the message is a recovery message.Returns:None"""
async def receive(self,message: AgentMessage,sender: Agent,reviewer: Optional[Agent] = None,request_reply: Optional[bool] = None,silent: Optional[bool] = False,is_recovery: Optional[bool] = False,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,) -> None:"""Receive a message from another agent.Args:message(AgentMessage): the received message.sender(Agent): the sender agent.reviewer(Agent): the reviewer agent.request_reply(bool): whether to request a reply.silent(bool): whether to be silent.is_recovery(bool): whether the message is a recovery message.Returns:None"""
回答生成
async def generate_reply(self,received_message: AgentMessage,sender: Agent,reviewer: Optional[Agent] = None,rely_messages: Optional[List[AgentMessage]] = None,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,**kwargs,) -> AgentMessage:"""Generate a reply based on the received messages.Args:received_message(AgentMessage): the received message.sender: sender of an Agent instance.reviewer: reviewer of an Agent instance.rely_messages: a list of messages received.Returns:AgentMessage: the generated reply. If None, no reply is generated."""
# 资源基础类class Resource(ABC, Generic[P]):"""Resource for the agent."""# 数据库资源对象class DBResource(Resource[P], Generic[P]):#知识资源对象(将召回对象作为资源绑定)class RetrieverResource(Resource[ResourceParameters]):#知识库空间资源对象(将DBGPT的知识库空间作为资源对象)class KnowledgeSpaceRetrieverResource(RetrieverResource):# 资源包(将多个资源变成一个资源包的方式绑定引用)class ResourcePack(Resource[PackResourceParameters]):# 内置工具资源class ToolPack(ResourcePack):# 插件工具资源包,可加载Autogpt插件class PluginToolPack(ToolPack):class AutoGPTPluginToolPack(ToolPack):# 内置工具定义和使用方法def list_dbgpt_support_models(model_type: Annotated[str, Doc("The model type, LLM(Large Language Model) and EMBEDDING).")] = "LLM",) -> str:...def get_current_host_cpu_status() -> str:...description="Baidu search and return the results as a markdown string. Please set ""number of results not less than 8 for rich search results.",)def baidu_search(query: Annotated[str, Doc("The search query.")],num_results: Annotated[int, Doc("The number of search results to return.")] = 8,) -> str:...
llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")context: AgentContext = AgentContext(conv_id="test456")agent_memory = AgentMemory()tools = ToolPack([simple_calculator, count_directory_files])prompt_template: PromptTemplate = prompt_service.get_template(prompt_code=record.prompt_template)await ToolAssistantAgent().bind(context) #agent 运行上下文 会话id、应用名、推理参数等.bind(LLMConfig(llm_client=llm_client)) #当前agent使用的模型服务.bind(agent_memory) # 绑定当前agent的记忆对象.bind(prompt_template) # 绑定Agent的prompt 覆盖角色定义 暂时依赖Prompt模块,后续改造为面向API.bind(tools)# 绑定当前agent要使用的资源.build() #Agent准备检查和预加载等工作
# 默认短期记忆 默认使用 ShortTermMemory(buffer_size=5) 内存队列作为存储agent_memory = AgentMemory(gpts_memory=self.memory)# 短期记忆class ShortTermMemory(Memory, Generic[T])# 长期记忆class LongTermMemory(Memory, Generic[T])embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)embedding_fn = embedding_factory.create(model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL])vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"Just use chroma store nowvector_store_connector = VectorStoreConnector(vector_store_type=CFG.VECTOR_STORE_TYPE,vector_store_config=VectorStoreConfig(name=vstore_name, embedding_fn=embedding_fn),)memory = HybridMemory[AgentMemoryFragment].from_chroma(vstore_name=vstore_name,embeddings=embedding_fn,)# 感知记忆class SensoryMemory(Memory, Generic[T])# 混合记忆class HybridMemory(Memory, Generic[T])# 增强短期记忆class EnhancedShortTermMemory(ShortTermMemory[T])
需要按会话id进行初始化和关闭:
self.memory.init({conv_id})try:# 这里开始一个Agent的对话await user_proxy.initiate_chat(recipient=tool_engineer,reviewer=user_proxy,message="Calculate the product of 10 and 99",)finally:await self.memory.clear({conv_id}))## 外部通过集体记忆对象的通道获取Agent的对话消息,支持流式输出async def chat_messages(self, conv_id: str, user_code: str = None, system_app: str = None,):while True:queue = self.memory.queue(conv_id)if not queue:breakitem = await queue.get()if item == "[DONE]":queue.task_done()breakelse:yield itemawait asyncio.sleep(0.005)
# 参考这个Acitonclass StartAppAction(Action[LinkAppInput]):async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:conv_id = kwargs.get("conv_id")user_input = kwargs.get("user_input")paren_agent = kwargs.get("paren_agent")init_message_rounds = kwargs.get("init_message_rounds")# TODO 这里放应用启动前的逻辑代码from dbgpt.serve.agent.agents.controller import multi_agentsawait multi_agents.agent_team_chat_new(new_user_input if new_user_input else user_input,conv_id,gpts_app,paren_agent.memory,False,link_sender=paren_agent,app_link_start=True,init_message_rounds=init_message_rounds,)return ActionOutput(is_exe_success=True, content="", view=None, have_retry=False)
# 参考这个Actionclass LinkAppAction(Action[LinkAppInput]):async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:# TODO 这里根据模型输出解析出下一步要走到的Agent角色名称role = "xxxx"# 当前Agent返回时指定下一个发言者信息return ActionOutput(is_exe_success=True,content=json.dumps(app_link_param, ensure_ascii=False),view=await self.render_protocal.display(content=app_link_param),next_speakers=[role],)
self._render_protocol = VisChart()view = await self.render_protocol.display(chart=json.loads(model_to_json(param)), data_df=data_df)
# 基类 和接口class LLMStrategy:# 默认使用当前模型服的默认模型async def next_llm(self, excluded_models: Optional[List[str]] = None):## 优先级策略的模型选择策略实现class LLMStrategyPriority(LLMStrategy):# 按配置优先级进行选择和重试async def next_llm(self, excluded_models: Optional[List[str]] = None) -> str:"""Return next available llm model name."""try:if not excluded_models:excluded_models = []all_models = await self._llm_client.models()if not self._context:raise ValueError("No context provided for priority strategy!")priority: List[str] = json.loads(self._context)can_uses = self._excluded_models(all_models, excluded_models, priority)if can_uses and len(can_uses) > 0:return can_uses[0].modelelse:raise ValueError("No model service available!")except Exception as e:logger.error(f"{self.type} get next llm failed!{str(e)}")raise ValueError(f"Failed to allocate model service,{str(e)}!")
class DataScientistAgent(ConversableAgent):"""Data Scientist Agent."""profile: ProfileConfig = ProfileConfig(name=DynConfig("Edgar",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_name",),role=DynConfig("DataScientist",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_role",),goal=DynConfig("Use correct {{dialect}} SQL to analyze and resolve user ""input targets based on the data structure information of the ""database given in the resource.",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_goal",),constraints=DynConfig(["Please ensure that the output is in the required format. ""Please ensure that each analysis only outputs one analysis ""result SQL, including as much analysis target content as possible.","If there is a recent message record, pay attention to refer to ""the answers and execution results inside when analyzing, ""and do not generate the same wrong answer.Please check carefully ""to make sure the correct SQL is generated. Please strictly adhere ""to the data structure definition given. The use of non-existing ""fields is prohibited. Be careful not to confuse fields from ""different tables, and you can perform multi-table related queries.","If the data and fields that need to be analyzed in the target are in ""different tables, it is recommended to use multi-table correlation ""queries first, and pay attention to the correlation between multiple ""table structures.","It is prohibited to construct data yourself as query conditions. ""Only the data values given by the famous songs in the input can ""be used as query conditions.","Please select an appropriate one from the supported display methods ""for data display. If no suitable display type is found, ""use 'response_table' as default value. Supported display types: \n""{{ display_type }}",],category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_constraints",),desc=DynConfig("Use database resources to conduct data analysis, analyze SQL, and provide ""recommended rendering methods.",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_desc",),)
async def thinking(self,messages: List[AgentMessage],sender: Optional[Agent] = None,prompt: Optional[str] = None,) -> Tuple[Optional[str], Optional[str]]:
def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:from dbgpt.agent.core.memory.hybrid import HybridMemoryfrom dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIGfrom dbgpt.rag.embedding.embedding_factory import EmbeddingFactorymemory_key = f"{dbgpts_name}_{conv_id}"if memory_key in self.agent_memory_map:return self.agent_memory_map[memory_key]# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)# embedding_fn = embedding_factory.create(# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]# )# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"# Just use chroma store now# vector_store_connector = VectorStoreConnector(# vector_store_type=CFG.VECTOR_STORE_TYPE,# vector_store_config=VectorStoreConfig(# name=vstore_name, embedding_fn=embedding_fn# ),# )# memory = HybridMemory[AgentMemoryFragment].from_chroma(# vstore_name=vstore_name,# embeddings=embedding_fn,# )agent_memory = AgentMemory(gpts_memory=self.memory)self.agent_memory_map[memory_key] = agent_memoryreturn agent_memory
class SqlInput(BaseModel):"""SQL input model."""display_type: str = Field(...,description="The chart rendering method selected for SQL. If you don’t know ""what to output, just output 'response_table' uniformly.",)sql: str = Field(..., description="Executable sql generated for the current target/problem")thought: str = Field(..., description="Summary of thoughts to the user")class ChartAction(Action[SqlInput]):"""Chart action class."""def __init__(self, **kwargs):"""Chart action init."""super().__init__(**kwargs)self._render_protocol = VisChart()def out_model_type(self):"""Return the output model type."""return SqlInputasync def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:"""Perform the action."""try:param: SqlInput = self._input_convert(ai_message, SqlInput)except Exception as e:logger.exception(f"{str(e)}! \n {ai_message}")return ActionOutput(is_exe_success=False,content="Error:The answer is not output in the required format.",)try:if not self.resource_need:raise ValueError("The resource type is not found!")if not self.render_protocol:raise ValueError("The rendering protocol is not initialized!")db_resources: List[DBResource] = DBResource.from_resource(self.resource)if not db_resources:raise ValueError("The database resource is not found!")db = db_resources[0]data_df = await db.query_to_df(param.sql)view = await self.render_protocol.display(chart=json.loads(model_to_json(param)), data_df=data_df)return ActionOutput(is_exe_success=True,content=model_to_json(param),view=view,resource_type=self.resource_need.value,resource_value=db._db_name,)except Exception as e:logger.exception("Check your answers, the sql run failed!")return ActionOutput(is_exe_success=False,content=f"Error:Check your answers, the sql run failed!Reason:{str(e)}",)
class XXXAgent(ConversableAgent):......# 为Action准备的额外执行参数def prepare_act_param(self, received_message: Optional[AgentMessage], sender: Agent,rely_messages: Optional[List[AgentMessage]] = None,**kwargs) -> Dict[str, Any]:historical_dialogues = kwargs.get("historical_dialogues", None)return {"user_input": received_message.content,"conv_id": self.agent_context.conv_id,"paren_agent": self,"rely_messages": rely_messages,"historical_dialogues": historical_dialogues,}
# 资源加载方法,此处会默认会将资源包通过资源类的方法转成资源输入给LLMasync def load_resource(self, question: str, is_retry_chat: bool = False, **kwargs):logger.info(f"DomainApi load_resource:{question}")class XXXAction(Action[xxInput]):async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:...return ActionOutput(is_exe_success=False, # 提示当前Agent进展失败content=json.dumps(intent.to_dict(), ensure_ascii=False), # 问题内容view=intent.ask_user if intent.ask_user else ai_message, # 问题展示效果(可以配合GptVis像用户发起类似动态表单的消息)have_retry=False, # 并主动向用户发起提问ask_user=True)
manager = AutoPlanChatManager()manager = (await manager.bind(context).bind(agent_memory).bind(llm_config).build())manager.hire(employees)user_proxy: UserProxyAgent = (await UserProxyAgent().bind(context).bind(agent_memory).build())await user_proxy.initiate_chat(recipient=manager,message=user_query,is_retry_chat=is_retry_chat,last_speaker_name=last_speaker_name,message_rounds=init_message_rounds,**ext_info,)
class AutoPlanChatManager(ManagerAgent):"""A chat manager agent that can manage a team chat of multiple agents."""
class PlannerAgent(ConversableAgent):"""Planner Agent.
class AWELBaseManager(ManagerAgent, ABC):"""AWEL base manager."""
## Agent相关算子### Agent Flow触发器,无实际逻辑,Flow的特性必须从触发器开始class AgentDummyTrigger(Trigger):### Agent算子容器,拥有一致的输入输出,可以实现Agent Flow的自由拼接class AWELAgentOperator(MixinLLMOperator, MapOperator[AgentGenerateContext, AgentGenerateContext]):## Agent Flow特性算子### 实现Agent Flow分支的算子class AgentBranchOperator(BranchOperator[AgentGenerateContext, AgentGenerateContext]):### 实现Agent Flow分支合并的算子class AgentBranchJoinOperator(BranchJoinOperator[AgentGenerateContext]):
# 实际Agent在Flow里的绑定节点(把Agent作为Agent算子容器的资源)class AWELAgent(BaseModel):# Agent的绑定资源,将Agent的绑定资源作为Agent资源节点的资源节点### Agent资源class AWELAgentResource(AgentResource):"""AWEL Agent Resource."""### Agent知识库资源class AWELAgentKnowledgeResource(AgentResource):### Agent的Prompt资源class AgentPrompt(BaseModel):### Agent的模型配置资源class AWELAgentConfig(LLMConfig):
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-07-04
ThinkParse 1.1.0 开源发布:把文档解析,做成可扩展的企业级服务
2026-07-04
Agent 工程终于有脚手架了, Google开源一个开发agent的工具
2026-07-03
用云新范式:Qoder Cloud Agents × Alibaba Cloud Skills
2026-07-03
Ornith-1.0 发布: 新一代 Agentic Coding 之王,MIT 开源
2026-07-02
Meta把内部设计系统开源了,支撑内部13000+应用,专为Agent调优
2026-07-02
别再把 AI 当搜索引擎了,这 20 个操作让它替你干活
2026-07-02
ollama v0.31.1发布:Apple Silicon上Gemma 4提速近90%,默认开启无感升级
2026-07-01
在 OpenCode 中接入本地模型:Ollama 部署与配置完全指南
2026-04-09
2026-04-18
2026-04-18
2026-06-22
2026-05-10
2026-05-06
2026-05-31
2026-05-20
2026-04-21
2026-04-21
2026-06-16
2026-05-30
2026-05-16
2026-04-22
2026-04-21
2026-04-15
2026-04-09
2026-04-01
欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。
在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。
一、 定义
本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。
会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。
知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。
二、 账号注册与登录
登录方式:本网站支持以下登录方式,您可根据实际情况选择:
微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。
手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。
账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。
实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。
未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。
三、 服务内容与规范
知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。
服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。
禁止行为:您在使用服务时不得实施以下行为:
利用技术手段批量爬取、下载、转存知识库内容;
将知识库内容用于商业目的或未经授权地向第三方传播;
干扰本网站正常运行或侵犯其他用户合法权益;
发布违法违规信息或从事违反公序良俗的活动。
四、 知识产权声明
权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。
有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。
侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。
五、 个人信息保护
我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。
您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。
您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。
六、 免责声明
内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。
不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。
第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。
七、 违约责任
如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。
如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。
八、 法律适用与争议解决
本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。
因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。
九、 其他
本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。
本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。
我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。