2026年5月7日 周四晚上19:30,来了解“企业AI训练师:从个人提效到构建企业AI生产力”(限30人)
免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

看 AgentRun 如何玩转记忆存储,最佳实践来了!

发布日期:2026-05-06 18:42:32 浏览次数: 1519
作者:阿里云云原生

微信搜一搜,关注“阿里云云原生”

推荐语

阿里云AgentRun通过三种记忆存储能力,让智能体真正记住用户需求,打造个性化交互体验。

核心内容:
1. AgentRun平台集成表格存储提供的三大记忆能力解析
2. 从零开始配置记忆存储的完整操作指南
3. 代码示例演示会话历史、长期记忆和会话状态的实际应用

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家
阿里云 AgentRun 是一个以高代码为核心的一站式 Agentic AI 基础设施平台。秉持生态开放和灵活组装的理念,为企业级 Agent 应用提供从开发、部署到运维的全生命周期管理。
AgentRun 通过集成表格存储(Tablestore),为智能体提供三种持久化记忆能力:在同一对话中维持上下文的会话历史、跨会话保留用户偏好等结构化信息的长期记忆,以及可直接读写的会话状态。本文介绍如何创建并配置记忆存储,并通过可运行的代码示例演示三种记忆类型的使用方式。

记忆类型




Cloud Native

快速使用




Cloud Native

步骤一:创建记忆存储

1. 登录函数计算控制台https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Ffcnext.console.aliyun.com%2F&lang=zh,在左侧菜单栏单击函数智能 > 智能体 AgentRun,进入 AgentRun 控制台。

重要:如果进入 AgentRun 控制台后,右上角有体验新版字样,单击切换到新版控制台。

2. 在左侧菜单栏单击记忆,然后单击添加记忆存储,填写相关配置项:
3. 单击开始部署,等待记忆存储部署完成。

步骤二:创建 Agent 并体验记忆功能

1. 在左侧菜单栏单击 Agent 运行时,然后单击创建 Agent,选择快速创建,填写相关配置项:
2. 单击创建 Agent,等待 Agent 启动完成。
3. 在模型对比测试窗口中:
  • 输入“我叫小明,最近在学习向量数据库”,观察 Agent 回复说明:函数计算遵循 Serverless(无服务器)架构,只有在请求到达时才创建实例,并能及时释放实例帮助用户节省成本。因此,首次提问时,会有较长的等待时间。)
  • 在同一对话中继续提问“我叫什么?”,验证会话历史(Agent 能记住本轮对话内容)
  • 新建会话,提问“我叫什么?”,验证长期记忆(Agent 能跨会话记住用户信息)

代码集成




Cloud Native

环境准备

pip3 install alibabacloud_agentrun20250910 openai tablestore agentrun-sdk agentrun-mem0ai google-adk litellm langchain-openai

配置环境变量请勿将密钥硬编码在代码中

# 阿里云账号凭据export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"# Agent调用配置(在控制台Agent运行时 > 详情 > 概览 > 访问信息中获取)export AGENT_ENDPOINT="https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations"# Agent调用凭证(在控制台凭证管理中获取)export AGENT_API_KEY="your-agent-api-key"# 记忆存储名称(步骤一创建后获取)export MEMORY_COLLECTION_NAME="mem-xxxx"# DashScope API Key(LangChain 和 Google ADK 示例使用)export DASHSCOPE_API_KEY="your-dashscope-api-key"# Tablestore 配置(会话状态示例使用)export TABLESTORE_ENDPOINT="https://{INSTANCE}.{REGION}.ots.aliyuncs.com"export TABLESTORE_INSTANCE="your-instance-name"

会话历史

三轮对话测试:第 1 轮告知姓名和职业,第 2 轮直接问“我是谁”,验证 Agent 是否记住了上文;第 3 轮继续追问,验证完整上下文是否贯穿始终。每次运行生成新的 session_id,互不干扰。

import osimport reimport threadingimport timeimport uuidfrom openai import OpenAI# ── 配置 ──────────────────────────────────────────────────────────────────────AGENT_ENDPOINT = os.environ["AGENT_ENDPOINT"]AGENT_API_KEY = os.environ["AGENT_API_KEY"]MODEL = os.getenv("MODEL""qwen3.5-plus")# 从 Endpoint URL 解析账号 ID 和 Agent 名称。# Endpoint 格式:https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations# 注意:这里使用账号 ID 仅适合单用户演示场景。在多用户应用中,# user_id 应代表终端用户(如应用内的用户 ID),而非开发者账号。_account_match = re.match(r"https://(\d+)\.agentrun-data\.", AGENT_ENDPOINT)USER_ID = _account_match.group(1if _account_match else "default_user"_agent_match = re.search(r"/agent-runtimes/([^/]+)/endpoints/", AGENT_ENDPOINT)AGENT_ID = _agent_match.group(1if _agent_match else "default_agent"client = OpenAI(    base_url=f"{AGENT_ENDPOINT}/openai/v1",    api_key=AGENT_API_KEY,)# ── 对话函数 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event):    """在等待首个 Token 期间显示转圈动画。"""    frames = ['⠋''⠙''⠹''⠸''⠼''⠴''⠦''⠧''⠇''⠏']    i = 0    while not stop_event.is_set():        print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)        time.sleep(0.1)        i += 1    print('\r' + ' ' * 35 + '\r', end='', flush=True)def chat(session_id: str, history: list, message: str) -> str:    """向 Agent 发送消息,通过 session_id 和消息历史维持会话上下文。    history 为本轮之前的对话历史,格式为        [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]    只保留 user/assistant 的文本内容(不含 tool_calls),历史消息与本轮消息    一起发送,Agent 可从上下文中直接读取之前的对话内容。    session_id 通过请求头 X-AgentRun-Conversation-ID 传递,AgentRun 用其    关联 Tablestore 中的会话记录(用 extra_body 传递无效,服务端不读请求体)。    """    print(f"用户: {message}")    # 启动转圈动画(覆盖冷启动和 MCP 工具调用的等待时间)    stop_event = threading.Event()    spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)    spinner.start()    stream = client.chat.completions.create(        model=MODEL,        messages=history + [{"role""user""content": message}],        stream=True,        extra_headers={            "X-AgentRun-Conversation-ID": session_id,            "X-AgentRun-User-ID": USER_ID,            "X-AgentRun-Agent-ID": AGENT_ID,        },    )    full_reply = [ ]    first_token = True    for chunk in stream:        if chunk.choices and chunk.choices[0].delta.content:            token = chunk.choices[0].delta.content            if first_token:                stop_event.set()                spinner.join()                print("助手: ", end="", flush=True)                first_token = False            print(token, end="", flush=True)            full_reply.append(token)    stop_event.set()    spinner.join()    print("\n")    return "".join(full_reply)# ── 主程序 ────────────────────────────────────────────────────────────────────def main():    # 每次运行生成新的 session_id,避免与上次运行的历史混淆。    # 在实际应用中,session_id 通常由应用层管理(如绑定用户的对话 ID)。    session_id = str(uuid.uuid4())    history = [ ]  # 维护对话历史(只含 user/assistant 文本消息)    print("═" * 50)    print("示例:多轮对话(会话历史)")    print("═" * 50)    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")    # 第一轮:自我介绍    print("\n[第 1 轮]")    q1 = "我叫小明,我是一名数据工程师,最近在研究 Tablestore"    r1 = chat(session_id, history, q1)    history.extend([        {"role""user""content": q1},        {"role""assistant""content": r1},    ])    # 第二轮:测试 Agent 是否记住了第一轮的内容    print("[第 2 轮]")    q2 = "我叫什么名字?我从事什么工作?我最近在研究什么?"    r2 = chat(session_id, history, q2)    history.extend([        {"role""user""content": q2},        {"role""assistant""content": r2},    ])    # 第三轮:继续深入话题    print("[第 3 轮]")    q3 = "针对我的工作,你有什么 Tablestore 使用建议?"    chat(session_id, history, q3)if __name__ == "__main__":    main()

说明推荐使用流式模式(stream=True):Agent 在处理消息时会调用 MCP 记忆工具(读写 Tablestore),流式模式可避免客户端因等待 MCP 工具调用而超时。

将上述代码保存为 conversation_history.py 并运行:

python3 conversation_history.py

长期记忆

方式一:通过 MCP 集成

在记忆详情的长期记忆 >  MCP 集成启动服务配置,启动后获取 MCP 服务 URL(SSE),配置到支持 MCP 工具调用的 Agent 中,即可自动提取、更新、删除长期记忆。

说明通过控制台使用 Agent 运行时,可在创建 Agent 配置记忆时开启 MCP(未提示表示已开启),Agent 会在每轮对话中自动调用 MCP 工具检索和更新长期记忆。对话中出现可提取的用户信息时(如:“我喜欢喝咖啡”),Agent 会自动存入向量库供后续跨 session 使用。

方式二:直接操作 mem0 向量存储

与方式一不同,方式二需要由代码显式调用 memory.add() 写入记忆—— mem0 不会自动监听对话,也不会自动判断“哪些内容值得记录”。这种方式适合在自定义 Agent 框架或数据管道中,由业务逻辑精确控制记忆的写入时机和内容。

本示例写入一段用户描述,mem0 内部调用 LLM 将其拆解为多条结构化记忆(如“名叫 Alice”、“喜欢喝咖啡”),然后用三个不同角度的语义查询验证检索效果。

import osimport timefrom agentrun.memory_collection import MemoryCollection# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]# agentrun-sdk 自动从以下环境变量读取凭证(无需在代码中显式传入):#   ALIBABA_CLOUD_ACCESS_KEY_ID / ALIBABA_CLOUD_ACCESS_KEY_SECRET#   AGENTRUN_REGION(默认 cn-hangzhou)# ── 初始化 mem0 客户端 ────────────────────────────────────────────────────────# to_mem0_memory() 自动完成两件事:#   1. 从 AgentRun 控制平面读取记忆存储配置(Tablestore 实例、集合、向量维度等)#   2. 使用读取到的配置初始化 agentrun-mem0ai Memory 客户端memory = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)# ── 核心操作 ──────────────────────────────────────────────────────────────────def add_memories(user_id: str):    """向记忆存储中添加用户信息。    memory.add() 支持两种输入:      - str:直接写入一段描述文本,mem0 内部会自动提炼为结构化记忆      - list[dict]:完整的对话消息列表(role/content),mem0 从中提取用户偏好    本示例使用文本输入演示基本用法。    """    print(f"\n[添加记忆] user_id={user_id}")    result = memory.add(        "我叫 Alice,是一名 Python 工程师,平时喜欢喝咖啡,最近在研究向量数据库。",        user_id=user_id,        metadata={"source_app""openmemory""mcp_client""python_sdk"},    )    results = result.get("results", [ ])    print(f"  写入 {len(results)} 条记忆:")    for i, res in enumerate(results, 1):        print(f"    {i}. [{res.get('event')}{res.get('memory''')}")def search_memories(user_id: str, query: str):    """从记忆存储中检索与查询语义最相关的记忆。"""    print(f"\n[检索记忆] 查询: {query!r}")    results = memory.search(query, user_id=user_id)    hits = results.get("results", [ ])    if not hits:        print("  (未找到相关记忆)")        return    for i, hit in enumerate(hits, 1):        score = hit.get("score"0)        content = hit.get("memory""")        print(f"  {i}. [{score:.4f}{content}")# ── 主程序 ────────────────────────────────────────────────────────────────────def main():    print("═" * 55)    print("长期记忆示例 —— 直接操作 mem0 向量存储")    print("═" * 55)    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")    user_id = "alice-demo"    # 写入记忆    add_memories(user_id)    # 等待 Tablestore 向量索引刷新(首次写入后搜索需要短暂延迟)    print("\n等待向量索引刷新...")    time.sleep(3)    # 语义检索:不同的查询角度    print()    print("──" * 27)    search_memories(user_id, "她喜欢什么饮料?")    search_memories(user_id, "她是做什么工作的?")    search_memories(user_id, "她最近在研究什么技术?")if __name__ == "__main__":    main()

将上述代码保存为 long_term_memory.py 并运行:

python3 long_term_memory.py

方式三:LangChain 生态集成

以旅行规划助手为例,演示在 LangChain 对话循环中集成 mem0 长期记忆的完整模式:每轮对话开始时先检索与用户问题相关的历史记忆并注入 prompt,生成回复后再将本轮对话内容写回 mem0,使记忆在对话过程中持续积累。多轮后可观察到 Agent 能主动引用早先提到的旅行偏好。

import osimport threadingimport timefrom typing import Listfrom langchain_core.messages import SystemMessage, HumanMessage, BaseMessagefrom langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholderfrom langchain_openai import ChatOpenAIfrom agentrun.memory_collection import MemoryCollection# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]DASHSCOPE_API_KEY = os.environ["DASHSCOPE_API_KEY"]# ── 初始化 LLM 和 mem0 客户端 ─────────────────────────────────────────────────# 通过 DashScope OpenAI 兼容接口调用大模型llm = ChatOpenAI(    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",    api_key=DASHSCOPE_API_KEY,    model="qwen-max",)# 初始化 mem0 客户端(自动从 AgentRun 控制平面读取 Tablestore 配置)mem0 = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)# ── Prompt 模板 ───────────────────────────────────────────────────────────────prompt = ChatPromptTemplate.from_messages([    SystemMessage(content="""你是一位专业的旅行规划助手,能够根据用户的偏好和历史对话提供个性化的旅行建议。如果有相关记忆上下文,请在回答中加以参考,让建议更符合用户需求。"""),    MessagesPlaceholder(variable_name="context"),    ("human""{input}"),])# ── 核心函数 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event):    """在等待 LLM 推理期间显示转圈动画。"""    frames = ['⠋''⠙''⠹''⠸''⠼''⠴''⠦''⠧''⠇''⠏']    i = 0    while not stop_event.is_set():        print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)        time.sleep(0.1)        i += 1    print('\r' + ' ' * 35 + '\r', end='', flush=True)def retrieve_context(query: str, user_id: str) -> List[BaseMessage]:    """从 mem0 检索与用户问题相关的记忆,作为 LangChain 消息列表注入 prompt。"""    try:        memories = mem0.search(query, user_id=user_id)        memory_list = memories.get("results", [ ])        if not memory_list:            return [ ]        serialized = " ".join(m["memory"for m in memory_list)        print(f"  [mem0 检索到 {len(memory_list)} 条相关记忆]")        return [SystemMessage(content=f"用户相关信息:{serialized}")]    except Exception as e:        print(f"  [mem0 检索失败: {e}]")        return [ ]def generate_response(user_input: str, context: List[BaseMessage]) -> str:    """携带记忆上下文调用 LLM 生成回复。"""    chain = prompt | llm    response = chain.invoke({"context": context, "input": user_input})    return response.contentdef save_interaction(user_id: str, user_input: str, assistant_response: str) -> int:    """将本轮对话写回 mem0,供后续对话检索使用。返回新增记忆条数。"""    interaction = [        {"role""user""content": user_input},        {"role""assistant""content": assistant_response},    ]    try:        result = mem0.add(interaction, user_id=user_id, metadata={"source_app""openmemory""mcp_client""python_sdk"})        return len(result.get("results", [ ]))    except Exception as e:        print(f"  [mem0 保存失败: {e}]")        return 0def chat_turn(user_input: str, user_id: str) -> str:    """单轮对话:检索记忆 → 生成回复 → 保存记忆。"""    context = retrieve_context(user_input, user_id)    # 用一个 spinner 同时覆盖 LLM 推理和 mem0 保存,全部完成后再打印回复    stop_event = threading.Event()    spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)    spinner.start()    response = generate_response(user_input, context)    count = save_interaction(user_id, user_input, response)    stop_event.set()    spinner.join()    print(f"助手: {response}\n")    if count:        print(f"  [mem0 新增 {count} 条记忆]\n")    return response# ── 主程序 ────────────────────────────────────────────────────────────────────def main():    print("═" * 60)    print("旅行规划助手(LangChain + mem0 长期记忆)")    print("═" * 60)    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")    print("输入 /quit 退出\n")    user_id = "travel-user-demo"    while True:        try:            user_input = input("你: ").strip()        except (EOFError, KeyboardInterrupt):            break        if not user_input or user_input == "/quit":            break        response = chat_turn(user_input, user_id)if __name__ == "__main__":    main()

将上述代码保存为 langchain_memory.py 并运行:

python3 langchain_memory.py

会话状态

方式一:通过 Tablestore SDK 查询会话数据

AgentRun 将每次对话的会话元数据写入 session 表,消息内容写入 message 表。本示例直接扫描这两张表,列出最近的会话记录,并读取其中一个会话的完整对话内容。适用于数据分析、会话审计等只读场景。

import jsonimport osfrom datetime import datetimeimport tablestore as ots# ── 配置 ──────────────────────────────────────────────────────────────────────ENDPOINT = os.environ["TABLESTORE_ENDPOINT"]INSTANCE = os.environ["TABLESTORE_INSTANCE"]ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]client = ots.OTSClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE)# ── 查询函数 ──────────────────────────────────────────────────────────────────def list_sessions(user_id: str = None, limit: int = 10) -> list[dict]:    """查询 session 表中的会话元数据列表。"""    if user_id:        min_pk = [("user_id", user_id), ("session_id", ots.INF_MIN)]        max_pk = [("user_id", user_id), ("session_id", ots.INF_MAX)]    else:        min_pk = [("user_id", ots.INF_MIN), ("session_id", ots.INF_MIN)]        max_pk = [("user_id", ots.INF_MAX), ("session_id", ots.INF_MAX)]    _, _, rows, _ = client.get_range(        "session", ots.Direction.FORWARD,        min_pk, max_pk,        columns_to_get=[ ], max_version=1, limit=limit,    )    sessions = [ ]    for row in rows:        pks = dict(row.primary_key)        attrs = {c[0]: c[1for c in row.attribute_columns}        # update_time 单位为微秒        update_time_us = int(attrs.get("update_time"0))        sessions.append({            "user_id": pks["user_id"],            "session_id": pks["session_id"],            "agent_id": attrs.get("agent_id"""),            "update_time": datetime.fromtimestamp(update_time_us / 1e6).strftime("%Y-%m-%d %H:%M:%S"),        })    return sessionsdef get_session_messages(session_id: str) -> list[dict]:    """查询 message 表中指定会话的所有对话消息。"""    min_pk = [("session_id", session_id), ("create_time", ots.INF_MIN), ("message_id", ots.INF_MIN)]    max_pk = [("session_id", session_id), ("create_time", ots.INF_MAX), ("message_id", ots.INF_MAX)]    _, _, rows, _ = client.get_range(        "message", ots.Direction.FORWARD,        min_pk, max_pk,        columns_to_get=[ ], max_version=1,    )    messages = [ ]    for row in rows:        attrs = {c[0]: c[1for c in row.attribute_columns}        if "content" in attrs:            try:                messages.extend(json.loads(attrs["content"]))            except json.JSONDecodeError:                pass    return messages# ── 主程序 ────────────────────────────────────────────────────────────────────def main():    print("═" * 55)    print("AgentRun 会话状态 —— Tablestore 数据查询示例")    print("═" * 55)    # ── 1. 列出所有会话 ───────────────────────────────────────────────────────    print("\n[会话列表]")    sessions = list_sessions(limit=5)    if not sessions:        print("  (暂无会话记录)")        return    for s in sessions:        print(f"  session_id : {s['session_id']}")        print(f"  agent_id   : {s['agent_id']}")        print(f"  更新时间   : {s['update_time']}")        print()    # ── 2. 读取最近一个会话的消息 ─────────────────────────────────────────────    latest = sessions[-1]    sid = latest["session_id"]    print(f"\n[会话详情] session_id: {sid}")    messages = get_session_messages(sid)    if not messages:        print("  (该会话暂无消息)")    else:        for msg in messages:            role = msg.get("role""?")            content = msg.get("content""")[:100]            ellipsis = "..." if len(msg.get("content""")) > 100 else ""            print(f"  [{role}{content}{ellipsis}")if __name__ == "__main__":    main()

将上述代码保存为 session_state.py 并运行:

python3 session_state.py

方式二:Google ADK 集成(OTSSessionService)

演示如何将 Tablestore 作为 Google ADK Agent 的会话状态后端。通过 OTSSessionService,ADK Agent 的每次对话都会自动持久化到 Tablestore,进程重启后可继续之前的会话,无需重新建立上下文。示例包含一个能查询天气的工具,用于验证 Agent 的工具调用和会话记录是否正常持久化。

from __future__ import annotationsimport asyncioimport osimport sysimport threadingimport timefrom typing import Anyfrom google.adk.agents import Agentfrom google.adk.models.lite_llm import LiteLlmfrom google.adk.runners import Runnerfrom google.genai import typesfrom agentrun.conversation_service import SessionStorefrom agentrun.conversation_service.adapters import OTSSessionService# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ.get("MEMORY_COLLECTION_NAME""")DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY""")if not MEMORY_COLLECTION_NAME:    print("ERROR: 请设置环境变量 MEMORY_COLLECTION_NAME")    sys.exit(1)APP_NAME = "adk-chat-demo"USER_ID = "demo_user"SESSION_FILE = ".adk_session_id"  # 本地持久化 session ID,删除此文件可开始新会话# ── 工具定义 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event):    """在等待 Agent 推理期间显示转圈动画。"""    frames = ['⠋''⠙''⠹''⠸''⠼''⠴''⠦''⠧''⠇''⠏']    i = 0    while not stop_event.is_set():        print(f'\rAgent: 正在思考... {frames[i % len(frames)]}', end='', flush=True)        time.sleep(0.1)        i += 1    print('\r' + ' ' * 35 + '\r', end='', flush=True)def get_weather(city: str) -> dict[strAny]:    """查询指定城市的天气信息。"""    data = {        "北京": {"weather""晴""temperature""5~15°C"},        "上海": {"weather""多云""temperature""12~20°C"},        "杭州": {"weather""阴""temperature""10~18°C"},    }    return data.get(city, {"error""暂无该城市数据"})# ── Step 1: 初始化 SessionStore ───────────────────────────────────────────────# SessionStore.from_memory_collection() 从 AgentRun 控制平面读取记忆存储配置,# 自动获取 Tablestore 实例和 endpoint,无需手动配置 OTS 连接参数。store = SessionStore.from_memory_collection(MEMORY_COLLECTION_NAME)store.init_tables()store.init_search_index()# ── Step 2: 创建 OTSSessionService ───────────────────────────────────────────session_service = OTSSessionService(session_store=store)# ── Step 3: 创建 Agent + Runner ───────────────────────────────────────────────# 通过 LiteLLM 调用 DashScope OpenAI 兼容接口custom_model = LiteLlm(    model="openai/qwen-max",    api_key=DASHSCOPE_API_KEY,    api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",)agent = Agent(    name="smart_assistant",    model=custom_model,    instruction="你是一个友好的中文智能助手,用户问天气时调用 get_weather 工具。",    tools=[get_weather],)runner = Runner(    agent=agent,    app_name=APP_NAME,    session_service=session_service,)# ── Step 4: 对话(自动持久化到 Tablestore) ───────────────────────────────────async def chat(session_id: str, text: str) -> str:    """发送消息并返回 Agent 回复。"""    content = types.Content(        role="user",        parts=[types.Part(text=text)],    )    reply_parts: list[str] = [ ]    async for event in runner.run_async(        user_id=USER_ID,        session_id=session_id,        new_message=content,    ):        if event.is_final_response() and event.content and event.content.parts:            for part in event.content.parts:                if part.text:                    reply_parts.append(part.text)    return "\n".join(reply_parts)# ── 主程序 ────────────────────────────────────────────────────────────────────async def main() -> None:    # 从本地文件读取上次的 session ID,尝试恢复会话;找不到则创建新会话    session = None    if os.path.exists(SESSION_FILE):        saved_id = open(SESSION_FILE).read().strip()        session = await session_service.get_session(            app_name=APP_NAME, user_id=USER_ID, session_id=saved_id        )        if session:            print(f"继续之前的会话: {session.id}")    if session is None:        session = await session_service.create_session(            app_name=APP_NAME,            user_id=USER_ID,            state={"user:language""zh-CN"},        )        open(SESSION_FILE, "w").write(session.id)        print(f"新会话已创建: {session.id}")    print("输入 /quit 退出,/new 开始新会话\n")    while True:        try:            user_input = input("你: ").strip()        except (EOFError, KeyboardInterrupt):            break        if not user_input:            continue        if user_input == "/quit":            break        if user_input == "/new":            session = await session_service.create_session(                app_name=APP_NAME,                user_id=USER_ID,                state={"user:language""zh-CN"},            )            open(SESSION_FILE, "w").write(session.id)            print(f"新会话已创建: {session.id}\n")            continue        stop_event = threading.Event()        spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)        spinner.start()        reply = await chat(session.id, user_input)        stop_event.set()        spinner.join()        print(f"Agent: {reply}\n")if __name__ == "__main__":    asyncio.run(main())

将上述代码保存为 adk_session.py 并运行:

python3 adk_session.py

管理记忆存储




Cloud Native

查询记忆存储的配置详情,包括关联的 Tablestore 实例名称、向量集合、向量维度、嵌入模型和长期记忆提取模型,以及当前账号下的所有记忆存储列表。适用于排查配置问题或了解记忆存储的底层资源信息。

import osfrom alibabacloud_agentrun20250910.client import Clientfrom alibabacloud_tea_openapi import models as open_api_models# ── 配置 ──────────────────────────────────────────────────────────────────────ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]REGION = os.getenv("AGENTRUN_REGION""cn-hangzhou")config = open_api_models.Config(    access_key_id=ACCESS_KEY_ID,    access_key_secret=ACCESS_KEY_SECRET,    endpoint=f"agentrun.{REGION}.aliyuncs.com",)client = Client(config)# ── 工具函数 ──────────────────────────────────────────────────────────────────def get_memory_collection(name: str) -> dict:    """查询记忆存储的详细配置。"""    resp = client.get_memory_collection(name)    return resp.body.data.to_map()def list_memory_collections() -> list:    """列出当前账号下的所有记忆存储。"""    from alibabacloud_agentrun20250910 import models as ar_models    req = ar_models.ListMemoryCollectionsRequest()    resp = client.list_memory_collections(req)    return [item.to_map() for item in (resp.body.data.items or [ ])]# ── 主程序 ────────────────────────────────────────────────────────────────────def main():    # ── 查询单个记忆存储 ──────────────────────────────────────────────────────    print("═" * 55)    print(f"查询记忆存储:{MEMORY_COLLECTION_NAME}")    print("═" * 55)    info = get_memory_collection(MEMORY_COLLECTION_NAME)    print(f"名称:{info.get('memoryCollectionName')}")    print(f"ID:{info.get('memoryCollectionId')}")    print(f"会话历史:{'已启用' if info.get('enableConversationHistory'else '未启用'}")    print(f"会话状态:{'已启用' if info.get('enableConversationState'else '未启用'}")    vector_cfg = info.get("vectorStoreConfig", {})    print(f"\n向量数据库:{vector_cfg.get('provider')}")    vector_detail = vector_cfg.get("config", {})    print(f"  实例:{vector_detail.get('instanceName')}")    print(f"  集合:{vector_detail.get('collectionName')}")    print(f"  向量维度:{vector_detail.get('vectorDimension')}")    embedder_cfg = info.get("embedderConfig", {})    embedder_detail = embedder_cfg.get("config", {})    print(f"\n嵌入模型:{embedder_detail.get('model')}")    print(f"模型服务:{embedder_cfg.get('modelServiceName')}")    llm_cfg = info.get("llmConfig", {})    llm_detail = llm_cfg.get("config", {})    print(f"\n长期记忆提取模型:{llm_detail.get('model')}")    print(f"模型服务:{llm_cfg.get('modelServiceName')}")    # ── 列出所有记忆存储 ──────────────────────────────────────────────────────    print("\n" + "═" * 55)    print("当前账号下的所有记忆存储")    print("═" * 55)    collections = list_memory_collections()    if not collections:        print("(无)")    for c in collections:        status = c.get("status""")        print(f"  - {c.get('memoryCollectionName')}  [{status}]")if __name__ == "__main__":    main()

将上述代码保存为 manage_memory.py 并运行:

python3 manage_memory.py

相关文档:

  • 什么是 AgentRun

    https://help.aliyun.com/zh/functioncompute/fc/what-is-agentrun

  • 通过代码创建 Agent(高代码)

    https://help.aliyun.com/zh/functioncompute/fc/create-agent-by-code-high-code

  • BrowserTool 浏览器

    https://help.aliyun.com/zh/functioncompute/fc/sandbox-browsertool

  • Agent Memory SDK

    https://help.aliyun.com/zh/tablestore/use-cases/tablestore-agent-memory-sdk

  • AgentRun 客户钉钉群群号:134570017218,如有技术问题或合作意向,欢迎联系我们。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询