微信扫码
添加专属顾问
我要投稿
打造稳定可迭代的AI代理?Harness Engineering方法论全解析,手把手教你构建Claude Code架构。核心内容: 1. Claude Code五大核心组件详解:主控循环、工具调度、上下文管理等 2. 23个模块化架构设计实现商业级AI代理框架 3. 开源代码实战演示各组件协同工作原理
如果你想把 Claude Code 这类 Agent 工具做得稳定、可评估、可迭代,Harness Engineering 是绕不过去的一层。这篇长文几乎把整套工程方法论讲透了。
截至 2026 年初,Claude Code 在上线 6 个月内年化收入已突破 10 亿美元[2]。这不是因为 prompt 更好,而是因为 Anthropic 围绕合适的模型搭起了合适的执行框架:流式 Agent loop、受权限控制的工具调度系统,以及能在超长会话中持续聚焦的上下文管理层。这套框架是可以复现的,而这正是本文要搭出来的。
Claude Code 由五个协同工作的核心组件构建而成:
单线程主控 loop 通过感知、推理和工具执行周期驱动模型,将结果反馈到上下文中,直到任务达到最终状态
类型化工具调度注册表映射工具 命名为处理程序、bash、read、write、grep、glob,每个都有严格的输入模式,限制模型可以表达的内容以及工具必须执行的内容
3. 上下文管理层结合了按需技能注入、三层会话压缩和磁盘持久内存,以在超出模型上下文窗口的会话之间保持连贯的推理
4. 基于规则的权限治理系统,具有三个评估层,始终拒绝、始终允许和由生命周期事件总线支持的用户门控批准,让外部挂钩观察和拦截每个工具调用
5. 多代理协调层,支持子代理上下文隔离、异步队友委托、FSM 管理的代理间协议和 git worktree 隔离,以实现并行任务执行,而不会产生文件级冲突
在这些组件中还有更多……。
在这篇博客中,我们将一一构建和测试每个 Claude Code 组件,并了解它们如何协同工作以超越其他代理框架。
所有代码均可在 GitHub repo中找到:
相关链接:GitHub - FareedKhan-dev/claude-code-from-scratch:Claude Code 架构的 23 个组件[3] Claude Code 架构的 23 个组件。通过…github.com 为 FareedKhan-dev/claude-code-from-scratch 开发做出贡献
我们的代码库结构如下......
claude-code-from-scratch/
│
├── core.py # Shared foundation — client, tools, dispatch, permissions
├── 01_perception_action_loop.py # The minimal while loop — core agent pattern
├── s02_tool_use.py # Tool dispatch map — name → handler registry
├── s03_todo_write.py # TodoWrite — plan before execution
├── s04_subagent.py # Subagent spawner — isolated child context
...
├── s10_team_protocols.py # FSM protocol — IDLE→REQUEST→WAIT→RESPOND
├── s11_autonomous_agents.py # Agents self-assign tasks from shared board
├── s12_worktree_task_isolation.py # Git worktree per parallel task
│
├── s13_streaming.py # Real-time token streaming
...
├── s21_mcp_runtime.py # Official MCP SDK — auto-register external tools
│
├── s22_production_mailbox.py # Redis pub/sub — replaces JSONL mailboxes
├── s23_worktree_advanced.py # Full worktree lifecycle — edge cases handled
...
└── skills/ # Agentic skills — loaded on demand by s05
我已将 Claude Code架构的每个组件分成不同的脚本,以便我们可以单独运行和测试每个组件。
Harness Engineering 是围绕 AI 模型构建运行环境的方法论,而不是只盯着模型本身。模型负责推理和决策,执行框架负责执行、约束与连接。设计得当的执行框架,会只给模型它真正需要的工具,并明确界定这些工具可以被怎样使用。
如果我将Harness Engineering的概念分解为四个核心原则,它们将是:
1、模型是决策的唯一来源,执行框架永远不会在模型输出上分支,它只执行模型请求的内容
2、工具是模型和世界之间的唯一接口,每个操作,从读取文件到生成子代理,都经过类型化、模式验证的工具调用
3、上下文是一种托管资源,模型每次看到的内容都是经过精心策划、压缩和注入的,而不是盲目积累的
4、权限是声明性的,而不是过程性的,允许什么、阻止什么、需要批准什么是在配置中定义的,而不是分散在条件逻辑中
Claude Code 不是代理框架。它是一种执行框架,是生产中部署的最精心设计的执行框架之一。 Anthropic 没有构建逻辑来决定何时读取文件或何时运行测试。他们为 Claude 提供了完成这些任务的工具,并信任模型来决定何时需要它们。
Claude Code 架构在几个方面遵循Harness Engineering的原则:
主 loop 是无状态和通用的,无论任务是单行修复还是多小时重构,它的运行都是相同的,因为所有特定于任务的智能都存在于模型中
工具注册表是唯一的扩展点,向 Claude Code 添加新功能意味着注册一个新工具,包含名称、描述和输入模式
上下文在大约 92% 的窗口使用率下得到主动管理,旧的对话轮次被汇总并保存到磁盘,使模型的工作记忆专注于当前任务
权限治理作为预执行层运行,每个工具调用在工具执行之前都会经过规则评估,使安全性成为结构属性而不是模型行为。
代理 loop 是其他一切都构建于其之上的单一架构原语。在工具之前,在权限之前,在多智能体协调之前,有一个loop,它调用模型,观察它想要做什么,执行它,并将结果反馈回来。
此阶段中的每个会话都会向 loop 添加一种机制,而无需更改 loop 本身。
任何代理系统最基本的原则是感知-行动-观察循环。
代理收到任务,使用工具尝试解决方案
观察结果,并决定是否继续或停止所有驱动,由模型而不是代码决定。
这不是重试 loop 或后备机制。它是核心推理引擎。在 Claude Code 中,这是 nO 主 loop,无论您要求 Claude 修复一行错误还是重构整个代码库,都会运行相同的 loop。代码永远不会改变。只有模型决定在其内部执行的操作才会发生变化。
为了使用人择模型构建 Claude Code的最基本现象,我们首先必须初始化客户端和模型。
# Import necessary libraries
from anthropic import Anthropic
# API Key (required)
# Get yours at: https://console.anthropic.com/
ANTHROPIC_API_KEY=sk-ant-xxx
# Model ID (required)
MODEL_ID=claude-sonnet-4-6
# Initialize the Anthropic client
client = Anthropic(base_url="https://api.anthropic.com", api_key=ANTHROPIC_API_KEY)
MODEL = MODEL_ID
# System prompt is the foundation of the agent's behavior, it sets the stage for how the model will approach tasks
DEFAULT_SYSTEM = f"You are a coding agent at {os.getcwd()}. Use tools to solve tasks. Act, don't explain."
由于我们正在构建 claude,因此我们使用的是人择模型,但您可以使用 litellm 交换您喜欢的任何模型,我的 github 代码库可以灵活地支持任何模型提供者。
系统提示是代理行为的基础,系统提示在大多数情况下用处不大,但为模型如何处理任务奠定基础至关重要。
正如我之前提到的,claude 是围绕工具构建的,因此我们需要为我们的代理定义一些基本工具来与世界交互。这些工具将成为模型可以执行操作和收集信息的界面。
BASIC_TOOLS = [
{
"name": "bash",
"description": "Run a shell command.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
},
]
在这里,我们定义了一个名为 bash 的工具,它允许模型执行 shell 命令。该工具具有名称、描述和指定输入的预期格式的输入架构。
BASIC_DISPATCH: dict = {
"bash": lambda inp: run_bash(inp["command"]),
}
调度是将模型的工具调用连接到实际代码执行的机制。它是一个将工具名称映射到处理函数的字典。这对于像包含大量工具的 claude 代码这样的大型架构来说非常重要,它允许我们将工具定义与其实现分开,并且可以轻松添加新工具而无需更改核心 loop。
def dispatch_tools(response_content: list, dispatch: dict) -> List[Dict[str, Any]]:
"""
Executes all tool_use blocks from a model's response and collects the results.
Args:
response_content (list): The `content` list from an Anthropic Message object.
dispatch (dict): The dispatch map to use for routing tool calls.
Returns:
list: A list of `tool_result` dictionaries ready to be sent back to the model.
"""
results = []
for block in response_content:
if block.type != "tool_use":
continue
tool_name = block.name
tool_input = block.input
tool_use_id = block.id
handler = dispatch.get(tool_name)
# Log the tool call for user visibility.
first_val = str(list(tool_input.values())[0])[:80] if tool_input else""
print(f"\033[33m[{tool_name}] {first_val}...\033[0m") # Yellow text
if handler:
try:
output = handler(tool_input)
except Exception as e:
output = f"Error during tool execution: {e}"
else:
output = f"Error: Unknown tool '{tool_name}'"
print(str(output)[:300]) # Print a preview of the output.
results.append({
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": str(output),
})
return results
dispatch_tools 是一个辅助函数,它获取模型的响应内容,识别任何工具调用,使用提供的调度映射执行它们,并收集结果。它还包括日志记录,以便了解正在调用的工具及其输出。
def agent_loop(messages: List[Dict[str, Any]], dispatch: Dict) -> None:
"""
Runs the core agent interaction loop until the model provides a final answer.
This function mutates the `messages` list in place, appending each new
assistant response and the results of any tool calls.
Args:
messages (list): The conversation history, which will be updated.
dispatch (dict): A map of tool names to their handler functions.
"""
whileTrue:
# 1. Call the LLM with the current conversation history and available tools.
print("\n\033[36m> Thinking...\033[0m")
response = client.messages.create(
model=MODEL,
system=DEFAULT_SYSTEM,
messages=messages,
tools=BASIC_TOOLS,
max_tokens=8000,
)
# Append the assistant's entire response (including any tool calls) to the history.
messages.append({"role": "assistant", "content": response.content})
# 2. Check if the loop should terminate.
# If the stop reason is not 'tool_use', it means the model has provided its final answer.
if response.stop_reason != "tool_use":
break
# 3. If the model wants to use tools, execute them.
# The `dispatch_tools` function finds all tool_use blocks and runs them.
results = dispatch_tools(response.content, dispatch)
# Append the tool results to the history as a new "user" message.
# This informs the model of the outcome of its requested actions.
messages.append({"role": "user", "content": results})
我们的代理 loop 是一个简单的 while loop,它会一直持续到模型表明它已得出最终答案。
让我们通过一个简单的任务来运行这个组件,看看它是如何工作的。
# Initialize the conversation history list.
history: List[Dict[str, Any]] = []
# Loop to continuously accept user input.
whileTrue:
try:
# Prompt the user for a query with a colored prompt.
query = input("\033[36ms01 >> \033[0m")
except (EOFError, KeyboardInterrupt):
# Allow the user to exit gracefully with Ctrl+D or Ctrl+C.
print("\nExiting.")
break
# Provide a simple way to exit the chat.
if query.strip().lower() in ("q", "exit", ""):
break
# Add the user's query to the conversation history.
history.append({"role": "user", "content": query})
# Start the agent loop with the current history.
# The `agent_loop` function will handle the back-and-forth with the model.
agent_loop(history, BASIC_DISPATCH)
# After the loop finishes, print the final text response from the assistant.
last_message = history[-1]
print("\n\033[32mFinal Answer:\033[0m")
for block in last_message.get("content", []):
if block.type == "text":
print(block.text)
print()
在此代码中,我们初始化一个空的对话历史记录,并输入一个不断提示用户输入的 loop。每个用户查询都会添加到对话历史记录中,然后我们调用 agent_loop 函数来处理与模型的交互。 loop 完成后,我们打印助手提供的最终答案。
s01 >> List all Python files in the current directory.
> Thinking...
[Tool Call: bash]
Command: ls *.py
[Result]
bash: ls: command not found
> Thinking...
[Tool Call: bash]
Command: find . -name "*.py" -type f
[Result]
./main.py
./utils.py
./agent.py
Final Answer:
- main.py
- utils.py
- agent.py
我在包含一些 Python 文件的目录中运行它。第一次尝试时,代理使用了错误的命令并收到错误。然后,它进行自我纠正,使用适当的 find 命令,并成功检索 Python 文件列表,展示了感知-行动-观察循环以及实际的错误恢复。
在Claude Code的内部架构中,工具注册表是工程师研究最多的组件之一,他们对其执行轨迹进行了逆向工程。
Claude Code 附带 18 个注册工具,例如 bash、read、write、edit、glob、grep、WebFetch、AskUserQuestion、TodoWrite 等。
这个系统的优雅之处不是工具的数量,而是添加新工具需要对核心 loop 进行零更改。
调度映射是使这成为可能的架构模式。
调度映射是一个字典,它将模型想要执行的操作与执行该操作的代码连接起来。 loop 完全不知道存在哪些工具——它只知道如何调用 dispatch[tool_name](input)。
这种分离意味着 Claude Code 可以有 18 个工具、30 个工具或 5 个工具,而 loop 本身永远不会改变。通过 MCP 可以扩展 Claude Code 的原理是一样的,新工具只是注册表中的新条目,无论它们是本地 Python 函数还是远程服务器。
EXTENDED_DISPATCH: dict = {
"bash": lambda inp: run_bash(inp["command"]),
"read": lambda inp: run_read(inp["path"], inp.get("start_line"), inp.get("end_line")),
"write": lambda inp: run_write(inp["path"], inp["content"]),
"grep": lambda inp: run_grep(inp["pattern"], inp.get("path", ".")),
"glob": lambda inp: run_glob(inp["pattern"]),
"revert": lambda inp: run_revert(inp["path"]),
}
每个键正是模型将在其工具调用中使用的名称。每个值都是执行它的 Python 函数。当 Claude 决定读取文件时,loop 会执行一次字典查找并调用处理程序。没有条件,没有类层次结构,没有框架路由逻辑。整个调度机制是一行:output = handler(tool_input)。
工具定义同样重要。这些是模型读取的内容,以决定调用哪个工具 - 描述字段不是文档,而是指令。
EXTENDED_TOOLS = BASIC_TOOLS + [
{
"name": "read",
"description": "Read a file and return numbered lines. Use when you need to inspect file content or reference specific line numbers. Returns up to 50,000 characters. Use start_line/end_line for large files.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"start_line": {"type": "integer"},
"end_line": {"type": "integer"},
},
"required": ["path"],
},
},
{
"name": "write",
"description": "Write content to a file. Automatically snapshots the previous content so you can revert. Creates parent directories if needed.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
{
"name": "grep",
"description": "Search for a regex pattern across files. Returns file paths and line numbers of matches.",
"input_schema": {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"path": {"type": "string"},
"recursive": {"type": "boolean"},
},
"required": ["pattern"],
},
},
{
"name": "glob",
"description": "Find files matching a glob pattern, e.g. '**/*.py'. Returns sorted list of matching paths.",
"input_schema": {
"type": "object",
"properties": {"pattern": {"type": "string"}},
"required": ["pattern"],
},
},
{
"name": "revert",
"description": "Restore a file to its state before the last write call. Use when a write operation produced incorrect results.",
"input_schema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
]
描述写得不好会导致模型选择错误的工具。如果 grep 说“搜索文件”而 bash 说“运行命令”,则模型将为每个搜索操作使用 bash,因为描述没有足够精确地约束它。
Claude Code 的内部工具描述对于何时应使用每个工具非常具体,这种特殊性可以在数百万次执行中产生一致、可预测的工具选择。
处理函数本身遵循一致的契约 - 它们接受输入字典,返回字符串,并且从不引发 loop 异常。错误以字符串形式返回,而不是抛出。
def run_read(path: str, start_line: int = None, end_line: int = None) -> str:
try:
withopen(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
s = (start_line or1) - 1
e = end_line orlen(lines)
numbered = "".join(f"{s+1+i:4d}\t{l}"for i, l inenumerate(lines[s:e]))
return numbered[:50000] or"(empty file)"
except FileNotFoundError:
returnf"Error: file not found: {path}"
except Exception as e:
returnf"Error reading {path}: {e}"
defrun_grep(pattern: str, path: str = ".", recursive: bool = True) -> str:
try:
flags = ["-r"] if recursive else []
r = subprocess.run(["grep", "-n", *flags, pattern, path],
capture_output=True, text=True, timeout=30)
return ((r.stdout + r.stderr).strip() or"(no matches)")[:10000]
except Exception as e:
returnf"Error: {e}"
请注意,run_read 返回编号的行。这是有意为之的——当模型调用 write 来修改文件时,它会引用之前的 read 中的行号。编号输出允许 Claude 精确地说“替换第 45 行到 67 行”。 Claude Code 的读取工具的工作原理完全相同。
s02 >> Find all TODO comments in the codebase and show me which files they are in.
> Thinking...
[grep] TODO
./agents/s03_todo_write.py:45: # TODO: add priority levels to tasks
./agents/s09_agent_teams.py:112: # TODO: replace JSONL mailboxes with Redis
> Thinking...
Final Answer:
Found 2 TODO comments across 2 files:
- s03_todo_write.py line 45: add priority levels to tasks
- s09_agent_teams.py line 112: replace JSONL mailboxes with Redis
该模型选择了 grep 而不是 bash,因为工具描述足够精确,可以使正确的选择显而易见。这就是完全按预期工作的调度地图。工具选择的智能完全存在于模型中——工具只需要足够好地描述工具,模型就能将意图与功能相匹配。
逆向工程 Claude Code 执行跟踪中最有启发性的发现之一是 Claude 在复杂任务上写入一行代码或读取单个文件之前所做的事情。它称为 TodoWrite。每次。
计划先于行动,计划落实后才采取行动。
这并非偶然。 Anthropic 观察到,如果没有明确的规划机制,模型会在多步骤任务上发生漂移。
它开始执行,遇到一个看起来有趣的中间结果,跟随它,并在二十分钟后浮出水面,完成了与所要求的内容相邻但不完全一样的操作。
TodoWrite 工具在架构层面解决了这个问题——不是让模型变得更智能,而是给它一个承诺机制,让它在整个执行过程中对其负责。
Claude Code 在每次工具调用后注入当前待办事项状态作为系统提醒。模型不会忘记它计划做什么,因为计划不断地重新注入到它的上下文中。这使得 Claude Code 能够可靠地完成跨越数十个工具调用的任务,而不会失去对目标的跟踪。
TODO_FILE = Path(".agent_todo.json")
deftodo_write(tasks: list) -> str:
data = [{"id": i, "task": t, "status": "pending"} for i, t inenumerate(tasks)]
TODO_FILE.write_text(json.dumps(data, indent=2))
return"Plan written:\n" + "\n".join(f" [{i}] {t}"for i, t inenumerate(tasks))
deftodo_read() -> str:
ifnot TODO_FILE.exists():
return"(no plan)"
data = json.loads(TODO_FILE.read_text())
return"\n".join(
f" [{t['id']}] [{t['status']:12s}] {t['task']}"for t in data
)
deftodo_update(index: int, status: str) -> str:
data = json.loads(TODO_FILE.read_text())
data[index]["status"] = status
TODO_FILE.write_text(json.dumps(data, indent=2))
returnf"Task {index} marked {status}"
三个工具作为一个整体协同工作。 todo_write 从一开始就承诺了完整的计划。 todo_update 在代理移动时标记每个步骤。 todo_read 让模型可以随时检查自己的进度。
它们共同创建了一个外部工作记忆,使执行保持诚实——模型不能默默地跳过步骤,因为每个步骤都有一个在回合中持续存在的状态。
系统提示已更新,强制进行规划。
SYSTEM = (
f"You are a coding agent at {os.getcwd()}. "
"Before working on any multi-step task, ALWAYS call todo_write first "
"to write your complete plan. Execute each step in order. "
"Call todo_update after completing each step."
)
“总是”这个词在这里是承重的。如果没有它,模型有时会进行计划。有了它,模型的计划就可以一致。使用强命令式语言的系统提示指令会产生比建议或推荐的行为更可靠的行为。
s03 >> Refactor the run_bash function to add configurable timeout and output truncation, then verify nothing breaks.
> Thinking...
[todo_write]
Plan written:
[0] Read the current run_bash implementation in core.py
[1] Add configurable timeout parameter with 120s default
[2] Add output truncation at 50,000 characters
[3] Write the updated implementation back to core.py
[4] Run the test suite to verify no regressions
[read] core.py (lines 45-72)
[todo_update] 0 → done
> Thinking...
[write] core.py
[todo_update] 1 → done
[todo_update] 2 → done
[todo_update] 3 → done
> Thinking...
[bash] python -m pytest tests/test_agents_smoke.py -q
44 passed in 2.1s
[todo_update] 4 → done
Final Answer:
Refactor complete. Added 120s configurable timeout and 50,000 character output
truncation to run_bash. All 44 smoke tests passing. No regressions.
该模型在接触单个文件之前承诺执行五个步骤。它按顺序执行了它们。它验证了它的工作。这并不是模型本质上小心谨慎,而是为模型提供了一种结构,使谨慎执行成为阻力最小的路径。
Claude Code 的执行跟踪揭示了它如何处理大型代码库探索的一些有趣的事情。
当要求了解新的 repository 时,Claude 不会直接将文件读入主对话中。
它产生三个并行的探索子代理,每个子代理都有不同的焦点,每个子代理都与主上下文完全隔离运行。主要对话收到三个清晰的摘要。
它永远不会看到数十个中间文件读取、grep 输出以及生成它们的目录列表。
这是子代理上下文隔离,该模式允许 Claude Code 在任意大的代码库上工作,而主对话窗口不会充满噪音。每个与最终答案无关的中间结果都保留在子代理中,并在完成时被丢弃。父级只需为其实际需要的上下文付费。
隔离是通过给每个子代理一个完全独立的 messages[] 列表来实现的。除了子级返回的最终文本响应之外,父级和子级之间没有共享状态。
SUBAGENT_SYSTEM = (
f"You are a subagent working on a specific subtask at {os.getcwd()}. "
"Complete your task thoroughly and summarise your findings clearly. "
"Your output will be the only thing the parent agent sees."
)
defspawn_subagent(prompt: str) -> str:
"""Run a fresh agent loop with completely isolated context."""
print(f"\033[35m [subagent spawned] {prompt[:60]}\033[0m")
# Completely fresh messages list - no shared state with parent
sub_messages = [{"role": "user", "content": prompt}]
whileTrue:
resp = client.messages.create(
model=MODEL,
system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=EXTENDED_TOOLS,
max_tokens=8000,
)
sub_messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = dispatch_tools(resp.content, EXTENDED_DISPATCH)
sub_messages.append({"role": "user", "content": results})
# Extract only the final text - discard all intermediate tool calls
result = "".join(
b.text for b in sub_messages[-1]["content"] ifhasattr(b, "text")
)
print(f"\033[35m [subagent done] {result[:80]}\033[0m")
return result
子代理运行与父代理完全相同的代理 loop。它可以访问完全相同的工具。唯一的区别是它的 messages[] 列表一开始是空的,并且它的系统提示将其集中在有界任务上。当它完成时,它积累的所有内容、读取的每个文件、每个 grep 输出、每个中间推理步骤都将被丢弃。只有最终的摘要会返回到父级。
它被注册为一个工具,以便模型可以决定何时使用它。
SUBAGENT_TOOLS = EXTENDED_TOOLS + [
{
"name": "spawn_subagent",
"description": (
"Spawn an isolated subagent to handle a subtask in a fresh context. "
"Use for exploration tasks, risky operations, or any work that would "
"generate large amounts of intermediate output that the main conversation "
"does not need to see. Returns only the subagent's final summary."
),
"input_schema": {
"type": "object",
"properties": {
"prompt": {"type": "string"}
},
"required": ["prompt"],
},
},
]
SUBAGENT_DISPATCH = {
**EXTENDED_DISPATCH,
"spawn_subagent": lambda inp: spawn_subagent(inp["prompt"]),
}
让我们测试一下这个......
s04 >> Analyse the entire codebase and give me a complete summary of the architecture.
> Thinking...
[spawn_subagent] Explore all Python files in the agents/ directory.
Map their imports, their exported functions, and what each one adds
over the previous session file.
[subagent] > Thinking...
[subagent] [glob] agents/**/*.py → 25 files found
[subagent] [read] agents/core.py
[subagent] [read] agents/s01_agent_loop.py
[subagent] [read] agents/s02_tool_use.py
... (22 more reads, all inside the subagent)
[subagent done] core.py exports client, tools, and dispatch maps.
Each session imports from core and adds one mechanism...
> Thinking...
Final Answer:
The codebase has a single shared foundation (core.py) that is imported by
23 session files. Each session adds exactly one mechanism without duplicating
any tool code. The architecture follows a strict separation between the shared
foundation and the session-specific logic layered on top of it.
父级对话包含一次工具调用。副特工跑了二十五。父母的上下文恰好增加了一个摘要段落。如果没有子代理隔离,父代理将累积所有二十五个文件读取,并且其后续推理将是关于文件内容而不是体系结构的推理。
隔离使得主要代理的推理保持在正确的抽象级别。
第三阶段是关于认知基础设施,其中代理超越单会话执行,仅在需要时加载领域知识。
在对话历史记录降低推理质量之前对其进行压缩,并将任务状态保存到磁盘,以便工作在进程重新启动后仍然存在。这就是Claude Code的技能系统、压缩器wU2和长期记忆文件的来源。
Harness Engineering中最昂贵的错误之一是将模型可能需要的所有内容都放入系统提示中。
包含 PDF 处理指南、代码审查方法、部署清单和安全审核框架的系统提示将在每个 API 调用上消耗数千个令牌,其中绝大多数与模型当前正在执行的操作无关。
Claude Code 通过渐进式披露解决了这个问题,同样的模式使其技能系统成为其架构上最干净的组件之一。
模型系统提示仅包含可用技能的一行描述。当模型认识到当前任务需要领域专业知识时,它会调用 load_skill(),并在需要时通过工具结果将完整指令直接注入到对话中。仅当知识实际相关时,模型才支付上下文成本。安装一百个技能,系统提示增长一百行,而不是一百页。
技能文件本身遵循一致的格式——用于发现的元数据标头,以及模型读取和应用的完整程序指令。
skills/
├── agent-builder/
│ └── SKILL.md
├── code-review/
│ └── SKILL.md
└── pdf/
└── SKILL.md
---
name:code-review
description:Usewhenaskedtoreviewcode,auditafileforbugs,
checkquality,orsuggestimprovements.
---
# Code Review Skill
## Review process
1.Readthefullfilebeforecommenting
2. Categorise issues:BUG/SECURITY/PERF/STYLE/SUGGEST
3.Citefileandlinenumberforeveryfinding
4.Endwithadeploy-readinesssummary
...
发现机制在启动时扫描技能目录,仅读取每个 SKILL.md 中的元数据标头,并构建一个进入系统提示符的轻量级注册表。
SKILLS_DIR = Path(__file__).parent.parent / "skills"
defdiscover_skills() -> dict[str, str]:
skills = {}
for skill_dir insorted(SKILLS_DIR.iterdir()):
skill_md = skill_dir / "SKILL.md"
if skill_dir.is_dir() and skill_md.exists():
lines = skill_md.read_text(encoding="utf-8").splitlines()
desc = ""
in_frontmatter = False
for line in lines:
if line.strip() == "---":
in_frontmatter = not in_frontmatter
continue
ifnot in_frontmatter and line.strip() andnot line.startswith("#"):
desc = line.strip()[:100]
break
skills[skill_dir.name] = desc or"No description."
return skills
defload_skill(name: str) -> str:
skill_path = SKILLS_DIR / name / "SKILL.md"
ifnot skill_path.exists():
returnf"Error: skill '{name}' not found. Use list_skills to see available."
returnf"=== SKILL: {name} ===\n{skill_path.read_text(encoding='utf-8')}"
系统提示引用所有可用技能,但不加载任何技能。
_skill_list = "\n".join(
f" - {n}: {d}" for n, d in discover_skills().items()
)
SYSTEM = (
f"You are a coding agent at {os.getcwd()}.\n"
"When a task requires specialised knowledge, call load_skill(name) "
"to get full instructions before starting. Do NOT guess.\n\n"
f"Available skills:\n{_skill_list}"
)
让我们测试一下技能方法论......
s05 >> Review agents/core.py for bugs, security issues, and code quality problems.
> Thinking...
[load_skill] code-review
=== SKILL: code-review ===
# Code Review Skill
## Review process
1. Read the full file before commenting...
> Thinking...
[read] agents/core.py
> Thinking...
[grep] def run_bash
agents/core.py:58
[grep] subprocess.run
agents/core.py:62: r = subprocess.run(command, shell=True, ...
> Thinking...
Final Answer:
[SECURITY] core.py:62
Issue: shell=True with unsanitised input enables command injection.
Why: Any string passed to run_bash is executed directly by the shell.
A prompt injection could construct a command like: echo hi; rm -rf /
Fix: Validate against the _ALWAYS_BLOCK list before execution,
or use shell=False with shlex.split() for non-pipeline commands.
[PERF] core.py:78
Issue: run_grep spawns a new subprocess for every call with no caching.
Why: Repeated grep calls on the same pattern/path redo filesystem traversal.
Fix: Add an LRU cache keyed on (pattern, path) for repeated queries.
[STYLE] core.py:91
Issue: run_read returns an empty string for empty files rather than
a descriptive message.
Fix: Return "(empty file)" to give the model a clear signal.
Summary: 1 security issue (blocking), 1 performance issue, 1 style issue.
Not safe to deploy without addressing the security finding.
该模型加载了技能,应用了其结构化方法,并通过文件和行引用生成了结果。如果没有这项技能,模型会审查代码,但不一致,没有强制分类,也没有部署准备情况摘要。该技能不会使模型变得更聪明。它使模型的输出在其执行的每次代码审查中保持一致和结构化。
每个长时间运行的会话都会遇到同样的问题。上下文窗口充满了工具输出、中间结果和十分钟前相关但现在只是噪音的对话轮次。
Claude Code 的压缩器 wU2 在大约 92% 上下文窗口使用率时自动触发。
它不会丢弃历史,而是对其进行总结,保留信息,同时显着减少代币占用。然后将摘要写入磁盘上的持久Markdown 文件,使代理的内存在会话重新启动后保持持久。
该实现使用三个显式层按顺序处理历史记录。最近的消息会逐字保留,因为它们包含主动推理上下文。较旧的消息通过专用压缩 API 调用折叠成单个摘要块。该摘要被写入 .agent_memory.md,以便下一个会话可以加载它并继续,而无需从头开始。
COMPRESS_THRESHOLD = 40_000 # ~10k tokens estimated
KEEP_RECENT = 6 # keep last N messages verbatim
MEMORY_FILE = Path(".agent_memory.md")
def_estimate_size(messages: list) -> int:
total = 0
for msg in messages:
content = msg.get("content", "")
ifisinstance(content, str):
total += len(content)
elifisinstance(content, list):
for block in content:
ifisinstance(block, dict):
total += len(str(block.get("text", "") or block.get("content", "")))
elifhasattr(block, "text"):
total += len(block.text or"")
return total
def_summarize(messages: list) -> str:
text = "\n\n".join(
f"[{m['role']}]: " + (
m["content"] ifisinstance(m["content"], str)
else" ".join(
b.get("text", "") ifisinstance(b, dict) elsegetattr(b, "text", "")
for b in (m["content"] ifisinstance(m["content"], list) else [])
)
)
for m in messages
)
resp = client.messages.create(
model=MODEL,
system="Summarise this conversation. Keep all important decisions, "
"code changes, file paths, and context. Be concise but complete.",
messages=[{"role": "user", "content": f"Summarise:\n\n{text[:20000]}"}],
max_tokens=2000,
)
return"".join(b.text for b in resp.content ifhasattr(b, "text"))
在每个代理响应打开计时器后,而不是根据测量的上下文大小调用压缩函数。
def maybe_compress(messages: list) -> bool:
if _estimate_size(messages) < COMPRESS_THRESHOLD:
returnFalse
iflen(messages) <= KEEP_RECENT:
returnFalse
old_messages = messages[:-KEEP_RECENT]
recent = messages[-KEEP_RECENT:]
print("\033[90m [compress] context large - summarising older messages...\033[0m")
summary = _summarize(old_messages)
# Write to disk - survives session restarts
MEMORY_FILE.write_text(
f"# Agent Memory\nGenerated: {datetime.now().isoformat()}\n\n{summary}\n"
)
# Replace old messages with one summary block
messages.clear()
messages.append({
"role": "user",
"content": f"[Context summary from previous conversation]\n\n{summary}",
})
messages.append({
"role": "assistant",
"content": "Understood. I have the full context from our previous work.",
})
messages.extend(recent)
print(f"\033[90m [compress] {len(old_messages)} messages → 1 summary "
f"| saved to {MEMORY_FILE}\033[0m")
returnTrue
在会话启动时,代理会检查现有内存文件并在第一条用户消息之前加载它。
history = []
if MEMORY_FILE.exists():
mem = MEMORY_FILE.read_text()
print(f"\033[90m [memory] loaded from {MEMORY_FILE}\033[0m")
history = [
{"role": "user", "content": f"[Loaded memory]\n\n{mem}"},
{"role": "assistant", "content": "Memory loaded. Ready to continue."},
]
s06 >> Add error handling to all tool functions in core.py, then write tests for each one.
> Thinking...
[read] agents/core.py
[todo_write]
[0] Read all tool function signatures in core.py
[1] Add try/except to run_bash, run_read, run_write, run_grep, run_glob
[2] Ensure all errors are returned as strings, not raised
[3] Write smoke tests for each tool in tests/
[4] Run the test suite to verify
[read] agents/core.py (lines 45-180)
[todo_update] 0 → done
> Thinking...
[write] agents/core.py
[todo_update] 1 → done
[todo_update] 2 → done
> Thinking...
[write] tests/test_tools.py
[todo_update] 3 → done
> Thinking...
[bash] python -m pytest tests/test_tools.py -v
test_run_bash_ok PASSED
test_run_bash_blocks_dangerous PASSED
test_run_read_existing_file PASSED
test_run_read_missing_file PASSED
test_run_write_and_revert PASSED
test_run_grep_pattern PASSED
5 passed in 0.8s
[todo_update] 4 → done
[compress] context large - summarising older messages...
[compress] 18 messages → 1 summary | saved to .agent_memory.md
Final Answer:
Added try/except error handling to all 5 tool functions. All errors now return
descriptive strings. 5 new tests written and passing.
经过长时间的读取、写入和测试后,压缩会自动触发。 18 条累积的消息——文件内容、测试输出、中间推理——折叠成一个摘要块。下次此会话启动时,它会加载该摘要并继续提供有关已完成内容的完整上下文,而无需为每个后续 API 调用的 18 轮历史记录付费。
上下文压缩使对话窗口易于管理。但它解决了与任务跟踪不同的问题。压缩是关于模型记住的内容。
任务图是关于代理跨会话、跨重新启动以及最终跨并行工作的多个代理承诺执行的操作。
Claude Code TodoWrite 系统是会话范围的。关闭航站楼,计划就消失了。此会话中的任务图将其扩展为持久的、依赖项感知的结构。每个任务都带有 ID、描述、状态、优先级以及在可用之前必须完成的上游任务 ID 的显式列表。
该图位于 .agent_tasks.json 中,并且可以承受所有情况(进程崩溃、会话重新启动和计算机重新启动)。
这是第四阶段多智能体系统构建的基础。当多个代理并行运行时,它们都会读取和写入同一个任务图。依赖系统确保它们在先决条件完成之前永远不会执行任务,并且第 4 阶段中的原子声明机制确保不会有两个代理同时声明同一任务。
TASKS_FILE = Path(".agent_tasks.json")
_TASKS_LOCK = threading.Lock()
def_load() -> list:
if TASKS_FILE.exists():
return json.loads(TASKS_FILE.read_text())
return []
def_save(tasks: list):
TASKS_FILE.write_text(json.dumps(tasks, indent=2))
defrun_task_create(description: str, depends_on: list = None,
priority: str = "medium") -> str:
with _TASKS_LOCK:
tasks = _load()
task = {
"id": uuid.uuid4().hex[:8],
"description": description,
"status": "pending",
"priority": priority,
"depends_on": depends_on or [],
"result": "",
}
tasks.append(task)
_save(tasks)
returnf"created task {task['id']}: {description}"
defrun_task_next() -> str:
"""Return the next unblocked pending task - respects dependency chain."""
tasks = _load()
done_ids = {t["id"] for t in tasks if t["status"] == "done"}
for t insorted(tasks, key=lambda x: {"high":0,"medium":1,"low":2}[x["priority"]]):
if t["status"] != "pending":
continue
ifall(dep in done_ids for dep in t.get("depends_on", [])):
returnf"[{t['id']}] [{t['priority']}] {t['description']}"
return"(no unblocked tasks available)"
defrun_task_update(task_id: str, status: str, result: str = "") -> str:
with _TASKS_LOCK:
tasks = _load()
for t in tasks:
if t["id"].startswith(task_id):
t["status"] = status
if result:
t["result"] = result
_save(tasks)
returnf"task {t['id']} → {status}"
returnf"Error: task '{task_id}' not found"
每个读写操作的线程锁都至关重要。在第 4 阶段,多个代理将同时调用 _load() 和 _save()。
如果没有锁,两个代理可以同时读取相同的状态,每个代理都独立地修改它,第二个写入会默默地覆盖第一个代理的更改。锁使每个任务状态转换都是原子的。
s07 >> We need to migrate the codebase to use async tool implementations throughout.
Break this down into tasks, identify dependencies, and start working through them.
> Thinking...
[task_create] Audit all sync tool functionsin core.py
priority: high
[task_create] Write async versions of run_bash, run_read, run_write
depends_on: [audit_task_id], priority: high
[task_create] Write async versions of run_grep, run_glob
depends_on: [audit_task_id], priority: medium
[task_create] Update ASYNC_DISPATCH map with new handlers
depends_on: [bash_read_write_task_id, grep_glob_task_id]
priority: high
[task_create] Update s18_parallel_tools.py to use ASYNC_DISPATCH
depends_on: [dispatch_task_id], priority: medium
[task_create] Run full test suite to verify async implementations
depends_on: [s18_task_id], priority: high
> Thinking...
[task_next]
[a3f2c891] [high] Audit all sync tool functionsin core.py
[read] agents/core.py (lines 1-100)
[task_update] a3f2c891 → done
> Thinking...
[task_next]
[b7d1e445] [high] Write async versions of run_bash, run_read, run_write
[read] agents/core.py (lines 55-120)
[write] agents/core.py
[task_update] b7d1e445 → done
> Thinking...
[task_next]
[c9f3a112] [medium] Write async versions of run_grep, run_glob
[write] agents/core.py
[task_update] c9f3a112 → done
> Thinking...
[task_next]
[d2e8b334] [high] Update ASYNC_DISPATCH map with new handlers
[write] agents/core.py
[task_update] d2e8b334 → done
> Thinking...
[task_next]
[e5a1c667] [medium] Update s18_parallel_tools.py to use ASYNC_DISPATCH
[read] agents/s18_parallel_tools.py
[write] agents/s18_parallel_tools.py
[task_update] e5a1c667 → done
> Thinking...
[task_next]
[f6b2d990] [high] Run full test suite to verify async implementations
[bash] python -m pytest tests/ -q
44 passed in 2.3s
[task_update] f6b2d990 → done
Final Answer:
Migration complete. 6 tasks executed in dependency order. All 5 tool functions
now have async counterparts in ASYNC_DISPATCH. s18_parallel_tools.py updated.
44 tests passing. The task graph in .agent_tasks.json shows all tasks done.
代理首先创建完整的任务图,自动识别依赖关系链,然后以正确的顺序执行任务,在其上游依赖关系被标记为完成之前永远不会尝试执行任务。
该图始终保留在磁盘上,这意味着如果进程在任务 3 之后崩溃,则重新启动会发现任务 1-3 已完成并从任务 4 继续,而不会重复任何工作。
这种行为使得任务图成为一种与 TodoWrite 根本不同的机制,不仅是为一个会话进行规划,而且是一种可以在任何情况下生存的持久项目状态。
第四阶段是打破单代理的上限,其中一个上下文窗口和一个执行线程不再足以在后台线程中运行缓慢的操作而不阻塞主 loop,将并行工作流委托给持久的专家代理,使用有限状态机管理代理间通信,无需中央协调器即可实现自主任务声明,并在 git worktree 级别隔离并行文件写入。
这就是 Claude Code 的并行子代理生成、后台执行队列和任务委托架构从第一原理重构的地方。
在Claude Code的内部架构中,h2A异步队列是其最实用的性能机制之一。当Claude运行测试套件、编译项目或执行长时间的数据库迁移时,它不会闲着等待结果。
它将操作推入后台,继续计划后续步骤,并在操作完成时收到通知。主要原因是 loop 永远不会阻塞 I/O。
如果没有这种机制,编码代理的速度只能与其最慢的工具调用一样快。一个需要 45 秒的测试套件意味着 45 秒的沉默,没有计划,没有并行工作,没有进展。通过将操作执行与代理的推理周期解耦,后台执行完全消除了这个上限。
该实现对每个后台操作使用一个守护线程,并使用一个用于通知的共享队列。当后台操作完成时,它将其结果推送到队列中。
代理 loop 每次都会清空队列,并将任何已完成的通知作为用户消息注入,以便模型可以在下一个推理周期中自然地对它们做出反应。
import threading
import queue
import subprocess
_BG_QUEUE: queue.Queue = queue.Queue()
defrun_bash_background(command: str, label: str = "") -> str:
"""Start a shell command in a background daemon thread. Returns immediately."""
label = label or command[:40]
def_run():
print(f"\033[90m [bg] started: {label}\033[0m")
try:
r = subprocess.run(
command, shell=True, capture_output=True,
text=True, timeout=300, cwd=os.getcwd()
)
output = (r.stdout + r.stderr).strip()[:2000] or"(no output)"
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: operation timed out after 300s"
status = "timed out"
except Exception as e:
output = f"Error: {e}"
status = "failed"
# Push result into notification queue - agent will pick it up next turn
_BG_QUEUE.put(
f"[Background task '{label}' {status}]\n{output}"
)
print(f"\033[90m [bg] {label} {status}\033[0m")
thread = threading.Thread(target=_run, daemon=True)
thread.start()
returnf"Background task started: '{label}'. You will be notified when it completes."
在我们的函数 agent_loop_with_bg() 中,我们使用 stream_loop() 作为主要代理 loop,但每次之后我们都会检查 _BG_QUEUE 是否有任何已完成的后台任务。如果有的话,我们会将它们作为用户消息注入到对话中,以便模型可以在下一轮中对它们做出反应。
代理 loop 被包装以在轮次之间排出通知。
def agent_loop_with_bg(messages: list):
"""Agent loop that injects background task completions between turns."""
stream_loop(messages, BG_TOOLS, BG_DISPATCH, system=SYSTEM)
# After each turn, inject any finished background operations
whilenot _BG_QUEUE.empty():
notification = _BG_QUEUE.get_nowait()
print(f"\033[90m [bg] notification injected\033[0m")
messages.append({"role": "user", "content": notification})
# Let the model react to the notification
stream_loop(messages, BG_TOOLS, BG_DISPATCH, system=SYSTEM)
我们基本上为模型提供了一个新工具 run_bash_background(),只要它想要启动长时间运行的操作而不阻塞,就可以调用该工具。当后台操作运行时,模型可以继续推理、规划,甚至执行其他任务。
操作完成后,模型会收到包含结果的通知,并可以在下一轮中对其做出反应。
s08 >> Run the full test suite in the background, thenwhile that runs
add docstrings to all functionsin core.py that are missing them.
> Thinking...
[bash_background] python -m pytest tests/ -v --tb=short
label: full test suite
Background task started: 'full test suite'. You will be notified when done.
> Thinking...
[read] agents/core.py
[todo_write]
[0] Identify functions missing docstrings in core.py
[1] Add docstrings to run_bash, run_read, run_write
[2] Add docstrings to run_grep, run_glob, run_revert
[3] Add docstrings to dispatch_tools, stream_loop
[4] Verify formatting is consistent
[grep] def [^_]
agents/core.py:58:def run_bash(command: str) -> str:
agents/core.py:78:def run_read(path: str, ...
agents/core.py:98:def run_write(path: str, ...
... (6 more)
[todo_update] 0 → done
> Thinking...
[write] agents/core.py (added docstrings to run_bash, run_read, run_write)
[todo_update] 1 → done
> Thinking...
[write] agents/core.py (added docstrings to run_grep, run_glob, run_revert)
[todo_update] 2 → done
> Thinking...
[write] agents/core.py (added docstrings to dispatch_tools, stream_loop)
[todo_update] 3 → done
[todo_update] 4 → done
[bg] full test suite completed
[Background task 'full test suite' completed]
44 passed in 2.1s
> Thinking...
Final Answer:
Added docstrings to all 8 functionsin core.py while the test suite ran
in the background. Test suite completed: 44 passed, no failures.
Both tasks finished concurrently - total wall time matched the test suite
duration, not the sum of both operations.
您可以看到,当我们的代理启动 45 秒的测试套件时,立即转向添加文档字符串,完成所有文档工作,然后在测试结果到达时收到测试结果。在阻塞模型中,这将花费连续的时间。
使用后台执行时,挂起时间受两个操作中较慢的一个的限制,而不是它们的总和。这正是 Claude Code 在实践中处理长时间运行操作的方式。
Claude Code 并行子代理系统生成临时代理,它们是为一项任务创建并被丢弃的。但真正的工程工作具有跨许多任务持续存在的专业性。
文件探索专家、代码编写专家和测试专家都从他们正在使用的代码库积累的上下文中受益。持久的团队成员在多个委派任务中保留该上下文。
每个队友都在后台线程中连续运行,具有定义的专业化和 JSONL 文件作为其收件箱。首席代理将任务写入队友的收件箱文件中。队友读取它,执行完整的代理 loop,并将结果写回领导的收件箱。
通信是完全异步的,领导者可以在队友执行的同时继续工作,并且队友对代码库积累的知识随着其处理的每项任务而增长。
MAILBOX_DIR = Path(".mailboxes")
MAILBOX_DIR.mkdir(exist_ok=True)
TEAMMATES = {
"explorer": (
f"You are an explorer agent specialising in reading and understanding "
f"codebases at {os.getcwd()}. You excel at mapping architecture, finding "
f"patterns, and understanding how components connect. Use read, glob, grep."
),
"writer": (
f"You are a writer agent specialising in creating and editing code at "
f"{os.getcwd()}. You excel at implementing changes cleanly, following "
f"existing patterns, and writing tests. Use write, read, bash."
),
}
在我们的 TEAMMATES 字典中,我们定义了两个专业代理,一个探险家和一个作家,每个人都有自己的系统提示,使他们专注于自己的专业。首席代理可以通过将消息写入这些队友各自的 JSONL 收件箱文件来将任务委派给这些队友。
def _inbox(name: str) -> Path:
return MAILBOX_DIR / f"{name}.jsonl"
def_send(to: str, frm: str, body: str):
withopen(_inbox(to), "a") as f:
f.write(json.dumps({"from": frm, "body": body}) + "\n")
def_receive(name: str) -> list[dict]:
path = _inbox(name)
ifnot path.exists():
return []
messages = [json.loads(l) for l in path.read_text().splitlines() if l.strip()]
path.write_text("") # clear inbox after reading
return messages
每个队友在后台线程中运行自己的代理 loop,不断轮询其收件箱。
def _run_teammate(name: str, system: str, stop_event: threading.Event):
print(f"\033[90m [{name}] ready and listening\033[0m")
whilenot stop_event.is_set():
for msg in _receive(name):
print(f"\033[35m [{name}] received: {msg['body'][:60]}\033[0m")
sub_messages = [{"role": "user", "content": msg["body"]}]
whileTrue:
resp = client.messages.create(
model=MODEL, system=system,
messages=sub_messages,
tools=EXTENDED_TOOLS, max_tokens=4000,
)
sub_messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = dispatch_tools(resp.content, EXTENDED_DISPATCH)
sub_messages.append({"role": "user", "content": results})
result = "".join(
b.text for b in sub_messages[-1]["content"]
ifhasattr(b, "text")
)
_send(msg["from"], name, result)
print(f"\033[35m [{name}] result sent to {msg['from']}\033[0m")
stop_event.wait(timeout=0.5)
队友 loop 不断检查收件箱中是否有新消息。当它收到消息时,它使用完整代理 loop 对其进行处理,生成响应,并将结果发送回发送者。
随着时间的推移,团队成员可以处理多项任务,通过每次交互积累有关代码库的知识。
s09 >> We need to understand the multi-agent architecture in this codebase and then
add proper logging to all inter-agent communication functions.
> Thinking...
[send_to_teammate] explorer
Map all inter-agent communication functions across the agents/ directory.
Find every function that sends or receives messages between agents.
Return file names, function names, and line numbers.
[explorer] > Thinking...
[explorer] [glob] agents/**/*.py
[explorer] [grep] def _send|def _receive|def send_to|mailbox
agents/s09_agent_teams.py:28: def _send(to: str, frm: str, body: str)
agents/s09_agent_teams.py:34: def _receive(name: str)
agents/s09_agent_teams.py:68: def send_to_teammate(name: str, message: str)
agents/s22_production_mailbox.py:45: async def send(self, to: str, message: dict)
agents/s22_production_mailbox.py:51: async def receive(self, name: str, timeout: float)
[explorer] result sent to lead
> Thinking...
[send_to_teammate] writer
Add structured logging to these 5 functionsin s09 and s22:
_send, _receive, send_to_teammate (s09)
send, receive (s22)
Log: timestamp, sender, recipient, message length, and status.
[writer] > Thinking...
[writer] [read] agents/s09_agent_teams.py (lines 25-75)
[writer] [read] agents/s22_production_mailbox.py (lines 40-60)
[writer] [write] agents/s09_agent_teams.py
[writer] [write] agents/s22_production_mailbox.py
[writer] [bash] python -m pytest tests/ -q
44 passed in 2.1s
[writer] result sent to lead
> Thinking...
Final Answer:
Explorer mapped 5 inter-agent communication functions across 2 files.
Writer added structured logging to all 5, including timestamp, sender,
recipient, message length, and delivery status. Tests still passing.
Both teammates ran concurrently total time was bounded by the slower task.
领导者将探索委托给一位专家,将实施委托给另一位专家。这两项任务都没有污染领导者的背景。浏览器在自己的线程中积累代码库知识。
作者使用该输出进行精确编辑。领导综合了结果。这是在架构级别而不是提示级别工作的多代理协作。
由于多个代理同时发送和接收消息,不协调的通信会产生竞争条件和死锁。代理可能会在收到第一个请求的响应之前发送第二个请求。两个代理可能会无限期地等待对方。
如果没有控制代理何时可以通信的协议,团队在负载最大的时刻是不可靠的。
Claude Code 通过工具调用的同步性质隐式解决代理间协调问题,模型发出dispatch_agent 调用并在继续之前等待结果。
FSM 使该协议变得明确,并针对异步通信的持久队友架构强制执行。每个代理都有四种状态:空闲、请求、等待、响应,以及一个严格的规则:代理在处于等待状态时不会转换为请求。这一单一规则消除了整个协调僵局。
from enum import Enum
classAgentState(Enum):
IDLE = "idle"
REQUESTING = "requesting"
WAITING = "waiting"
RESPONDING = "responding"
classProtocolAgent:
def__init__(self, name: str, system: str):
self.name = name
self.system = system
self.state = AgentState.IDLE
self._inbox = MAILBOX_DIR / f"{name}_proto.jsonl"
self._lock = threading.Lock()
defsend(self, to: "ProtocolAgent", message: str):
withself._lock:
ifself.state == AgentState.WAITING:
returnf"Error: cannot send while waiting for a response"
self.state = AgentState.REQUESTING
withopen(to._inbox, "a") as f:
f.write(json.dumps({
"from": self.name,
"body": message,
"timestamp": datetime.now().isoformat(),
}) + "\n")
withself._lock:
self.state = AgentState.WAITING
returnf"Message sent to {to.name}. Waiting for response."
defhandle(self, message: str) -> str:
withself._lock:
self.state = AgentState.RESPONDING
sub = [{"role": "user", "content": message}]
whileTrue:
resp = client.messages.create(
model=MODEL, system=self.system,
messages=sub, tools=EXTENDED_TOOLS, max_tokens=4000,
)
sub.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = dispatch_tools(resp.content, EXTENDED_DISPATCH)
sub.append({"role": "user", "content": results})
withself._lock:
self.state = AgentState.IDLE
return"".join(b.text for b in sub[-1]["content"] ifhasattr(b, "text"))
我们可以通过模拟两个代理相互发送消息并验证它们从未违反协议来测试 FSM。在此测试中,我们尝试在第一条消息仍在等待响应时发送第二条消息,并检查 FSM 是否正确阻止该操作。
s10 >> Analyse the test coverage of core.py and then write tests for any
functions that have less than 80% branch coverage.
> Thinking...
[delegate] alpha
Run coverage analysis on core.py. Report whichfunctions have
less than 80% branch coverage and what branches are untested.
[alpha: IDLE → REQUESTING → WAITING]
[alpha] > Thinking...
[alpha] [bash] python -m pytest tests/ --cov=agents/core
--cov-report=term-missing -q
Name Stmts Miss Branch BrPart Cover
agents/core.py 124 18 42 8 71%
Missing branches in: run_revert (new file path),
check_permission (ask_user tier), stream_loop (extra_kwargs)
[alpha: RESPONDING → IDLE]
[alpha → lead] Coverage report: 3 functions below 80%...
> Thinking...
[delegate] beta
Write pytest tests for these 3 missing branches in core.py:
1. run_revert when file was newly created (SNAPSHOTS[path] is None)
2. check_permission when ask_user tier matches (requires input mock)
3. stream_loop when extra_kwargs are passed
[beta: IDLE → REQUESTING → WAITING]
[beta] > Thinking...
[beta] [read] tests/test_agents_smoke.py
[beta] [write] tests/test_core_coverage.py
[beta] [bash] python -m pytest tests/test_core_coverage.py -v
test_run_revert_new_file PASSED
test_check_permission_ask_user_mocked PASSED
test_stream_loop_extra_kwargs PASSED
3 passed in 0.4s
[beta: RESPONDING → IDLE]
[beta → lead] 3 tests written and passing...
> Thinking...
Final Answer:
Alpha identified 3 functions below 80% branch coverage.
Beta wrote targeted tests for all 3 missing branches.
All new tests passing. FSM state transitions completed cleanly -
no agent attempted to send while waiting for a response.
当我运行此测试时,会记录每个状态转换。在收到第一个请求的响应之前,没有代理发送第二个请求。该协议在架构级别强制执行此操作,而不要求模型对协调进行推理 - 模型仅调用 delegate,FSM 处理其余部分。
FSM 协议管理代理之间的通信,但仍然需要领导来分配工作。对于非常大的工作负载,迁移整个代码库、为数百个函数生成文档、在数千个文件中运行分析,甚至领先成为瓶颈。自主自分配完全消除了协调器。
每个代理对阶段 3 中的共享任务图运行连续扫描 loop。当它发现未阻塞的待处理任务时,它会使用线程锁以原子方式声明它并开始执行。
如果没有它,锁就很关键,两个代理同时扫描都会发现相同的可用任务,都声明它,并且都执行它,浪费计算并可能产生冲突的结果。
def claim_next_task(agent_id: str) -> dict | None:
"""Atomically find and claim the next available unblocked task."""
with _TASKS_LOCK:
tasks = _load()
done_ids = {t["id"] for t in tasks if t["status"] == "done"}
# Sort by priority before scanning
priority_order = {"high": 0, "medium": 1, "low": 2}
sorted_tasks = sorted(tasks, key=lambda t: priority_order.get(t["priority"], 1))
for t in sorted_tasks:
if t["status"] != "pending":
continue
# Check all dependencies are satisfied
ifall(dep in done_ids for dep in t.get("depends_on", [])):
t["status"] = "in_progress"
t["claimed_by"] = agent_id
_save(tasks)
return t
returnNone
defrun_autonomous_agent(agent_id: str, stop_event: threading.Event):
system = (
f"You are autonomous agent {agent_id} at {os.getcwd()}. "
"Complete assigned tasks thoroughly. Report your results clearly."
)
print(f"\033[90m [{agent_id}] scanning task board\033[0m")
whilenot stop_event.is_set():
task = claim_next_task(agent_id)
ifnot task:
stop_event.wait(timeout=1.0)
continue
print(f"\033[35m [{agent_id}] claimed [{task['id']}] "
f"{task['description'][:60]}\033[0m")
messages = [{"role": "user", "content": task["description"]}]
try:
whileTrue:
resp = client.messages.create(
model=MODEL, system=system,
messages=messages,
tools=EXTENDED_TOOLS, max_tokens=4000,
)
messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = dispatch_tools(resp.content, EXTENDED_DISPATCH)
messages.append({"role": "user", "content": results})
result = "".join(
b.text for b in messages[-1]["content"]
ifhasattr(b, "text")
)
with _TASKS_LOCK:
tasks = _load()
for t in tasks:
if t["id"] == task["id"]:
t["status"] = "done"
t["result"] = result
_save(tasks)
break
print(f"\033[32m [{agent_id}] completed [{task['id']}]\033[0m")
except Exception as e:
with _TASKS_LOCK:
tasks = _load()
for t in tasks:
if t["id"] == task["id"]:
t["status"] = "failed"
t["error"] = str(e)
_save(tasks)
break
print(f"\033[31m [{agent_id}] failed [{task['id']}]: {e}\033[0m")
在我们的 claim_next_task() 函数中,我们在扫描任务图之前获取锁。在 Claude Code阶段 4 中,多个代理将同时调用此函数。
锁确保当一个代理找到可用任务并声明它时,在第一个代理释放锁之前,其他代理都无法声明相同的任务。这可以防止竞争条件并确保每项任务仅由一个代理执行。
让我们通过发布几个具有依赖关系的任务,然后启动两个自治代理来执行它们来测试这一点。
s11 >> Post tasks to add type annotations to every functionin core.py.
Each tool function gets its own task. Let the autonomous agents handle it.
> Thinking...
[post_task] Add type annotations to run_bash priority: high
[post_task] Add type annotations to run_read priority: high
[post_task] Add type annotations to run_write priority: high
[post_task] Add type annotations to run_grep priority: medium
[post_task] Add type annotations to run_glob priority: medium
[post_task] Add type annotations to run_revert priority: medium
[post_task] Run mypy on core.py to verify all annotations
depends_on: [all 6 annotation tasks] priority: high
> Thinking...
[agent-1] claimed [a1b2c3d4] Add type annotations to run_bash
[agent-2] claimed [e5f6a7b8] Add type annotations to run_read
[agent-1] > Thinking...
[agent-1] [read] agents/core.py (lines 55-72)
[agent-1] [write] agents/core.py
[agent-1] completed [a1b2c3d4]
[agent-2] > Thinking...
[agent-2] [read] agents/core.py (lines 75-95)
[agent-2] [write] agents/core.py
[agent-2] completed [e5f6a7b8]
[agent-1] claimed [c9d0e1f2] Add type annotations to run_write
[agent-2] claimed [g3h4i5j6] Add type annotations to run_grep
... (parallel execution continues)
[agent-1] completed [c9d0e1f2]
[agent-2] completed [g3h4i5j6]
[agent-1] completed [k7l8m9n0] (run_glob)
[agent-2] completed [o1p2q3r4] (run_revert)
[agent-1] scanning... mypy task still blocked (dependencies not all done)
[agent-2] scanning... mypy task still blocked (dependencies not all done)
[agent-1] claimed [s5t6u7v8] Run mypy on core.py (all deps now done)
[agent-1] [bash] python -m mypy agents/core.py --strict
agents/core.py: Success: no issues found in 1 source file
[agent-1] completed [s5t6u7v8]
Final Answer:
7 tasks posted. 2 autonomous agents executed them in parallel respecting
the dependency chain. 6 annotation tasks ran concurrently, mypy verification
ran only after all 6 completed. Total time: 34s vs ~95s sequential.
mypy strict mode passes with no issues.
两个代理在任务可用时立即领取任务,并行工作,并且 mypy 验证任务保持阻塞状态,直到所有六个注释任务都标记为完成。
首次发布后,没有领导分配单个任务。 这种模式使大规模自主工作变得易于处理,任务排序的智能存在于依赖图中,而不是存在于协调代理中。
并行代理写入同一目录中的相同文件最终会发生冲突。两个代理同时编辑 core.py 将产生一个损坏的文件,无论每个代理工作得多么仔细。
文件系统不知道代理意图,它只知道写入操作,并且两个并发写入会产生未定义的结果。
Git worktrees 为每个代理提供了对 repository 的完整检查,它自己的目录、它自己的分支、它自己的工作树。并行工作的两个代理实际上是写入不同目录中的不同文件。
不存在写入冲突的可能性,因为文件本身是独立的。当这两项任务完成后,该工具会比较每个分支修改的文件,并在合并之前显示任何重叠的更改以供人工审核。
def create_worktree(task_id: str) -> tuple[str, str]:
"""Create an isolated git worktree for a task."""
branch = f"task/{task_id}"
path = str(Path(os.getcwd()).parent / f".worktree-{task_id[:8]}")
if Path(path).exists():
shutil.rmtree(path, ignore_errors=True)
_git("worktree", "remove", "--force", path)
# Handle branch conflict from previous run
if _branch_exists(branch):
_git("branch", "-D", branch)
rc, _, err = _git("worktree", "add", "-b", branch, path)
if rc != 0:
raise RuntimeError(f"Failed to create worktree: {err}")
return path, branch
defrun_task_in_worktree(task: dict) -> str:
"""Run an agent loop in an isolated worktree directory."""
task_id = task["id"]
wt_path, wt_branch = create_worktree(task_id)
system = (
f"You are a coding agent working in isolated directory: {wt_path}. "
f"Task: {task['description']}. "
"Your changes are on a separate git branch - you cannot affect other agents. "
"When done, summarise every file you changed."
)
messages = [{"role": "user", "content": task["description"]}]
try:
whileTrue:
resp = client.messages.create(
model=MODEL, system=system,
messages=messages,
tools=EXTENDED_TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = []
for block in resp.content:
if block.type != "tool_use":
continue
# Route bash and write calls through the worktree directory
if block.name == "bash":
old_cwd = os.getcwd()
os.chdir(wt_path)
output = run_bash(block.input["command"])
os.chdir(old_cwd)
else:
output = EXTENDED_DISPATCH.get(
block.name, lambda _: "unknown"
)(block.input)
print(f"\033[33m [{task_id[:6]}][{block.name}] "
f"{str(list(block.input.values())[0])[:60]}\033[0m")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
return"".join(
b.text for b in messages[-1]["content"] ifhasattr(b, "text")
)
finally:
# Always remove the worktree - even on failure
_git("worktree", "remove", "--force", wt_path)
shutil.rmtree(wt_path, ignore_errors=True)
_git("branch", "-D", wt_branch)
我们可以通过发布两个都修改 core.py 的任务,然后启动两个代理来执行它们来测试这一点。
每个代理将创建自己的 worktree,单独对 core.py 进行更改,当两者都完成时,该工具将检测到两个分支都修改了 core.py 并在合并之前将其标记为供人工审核。
s12 >> Run two tasks in parallel:
Task A: Add input validation to all tool functionsin core.py
Task B: Add logging statements to all tool functionsin core.py
> Thinking...
Running 2 tasks in parallel worktrees...
[task-a] worktree created: .worktree-a1b2c3d4 (branch: task/a1b2c3d4)
[task-b] worktree created: .worktree-e5f6a7b8 (branch: task/e5f6a7b8)
[a1b2c3][read] agents/core.py
[e5f6a7][read] agents/core.py
[a1b2c3][write] agents/core.py (added input validation to run_bash)
[e5f6a7][write] agents/core.py (added logging to run_bash)
[a1b2c3][write] agents/core.py (added input validation to run_read)
[e5f6a7][write] agents/core.py (added logging to run_read)
[a1b2c3][bash] python -m pytest tests/ -q → 44 passed
[e5f6a7][bash] python -m pytest tests/ -q → 44 passed
[task-a] completed
[task-b] completed
Conflict detection:
Both branches modified: agents/core.py
Overlap detected on 1 file - human review required before merging.
Results:
Task A: Added input validation to 6 tool functions. Tests passing.
Task B: Added logging to 6 tool functions. Tests passing.
1 merge conflict to resolve: both tasks modified core.py on different
branches. Changes are compatible but require manual merge review.
Branch task/a1b2c3d4 and task/e5f6a7b8 are ready for review.
两个代理同时读取和修改 core.py,但由于它们在单独的 worktree 中工作,因此写入不会互相干扰。每个代理针对自己的分支运行测试套件并获得干净的结果。
冲突检测在完成并正确识别出人类需要在任一分支登陆主分支之前检查合并之后运行。这是repo并行执行中最强的隔离模型,任务中干扰的可能性为零。
第五阶段是关于工作代理和可部署代理之间的差距,其中流使模型的输出实时可见,文件工具通过自动快照变得可逆,权限治理通过 YAML 规则系统变得声明性,每个工具调用通过生命周期事件总线变得可观察,并且对话通过会话持久性变得持久。
这就是 Claude Code 的信任系统、钩子架构和会话持久性的实现之处。
在 Claude Code 中,流式传输不是一项功能,而是默认设置。每次交互都会在生成令牌时将其流式传输到终端。流代理和阻塞代理之间的区别在于感觉像协作者的工具和感觉像批处理作业的工具之间的区别。对于简短的响应,差距是难以察觉的。
对于跨越数十个工具调用的长推理链,阻塞代理会静默几分钟,而流代理会实时显示其思维。
从阻塞到流式的变化是一次交换,client.messages.create() 变成了 client.messages.stream()。 loop 逻辑保持相同。调度逻辑保持相同。只是响应的消耗方式发生了变化。
def agent_loop_streaming(messages: list):
whileTrue:
with client.messages.stream(
model=MODEL,
system=DEFAULT_SYSTEM,
messages=messages,
tools=EXTENDED_TOOLS,
max_tokens=8000,
) as stream:
# Each token printed immediately as it arrives
for text in stream.text_stream:
print(text, end="", flush=True)
response = stream.get_final_message()
print()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = dispatch_tools(response.content, EXTENDED_DISPATCH)
messages.append({"role": "user", "content": results})
最后的 stream.get_final_message() 调用很重要 - 它会阻塞,直到流被完全消耗并返回与 messages.create() 结构相同的完整响应对象。
这意味着无论 loop 使用流式传输还是阻塞式,所有下游逻辑、停止原因检查、工具使用解析、内容块提取的工作原理都是相同的。
我们可以通过重构现有代理 loop 以使用流式传输并验证输出是否正确并且测试套件仍然通过来对此进行测试。
s13 >> Refactor the agent_loop function in s01 to use streaming
and explain what changed and why.
I will start by reading the current implementation...
[read] agents/s01_agent_loop.py
The current s01 uses `client.messages.create()` which blocks until
the model finishes generating the entire response before returning.
Here is what needs to change...
[write] agents/s01_agent_loop.py
The key change is replacing the blocking call with a streaming
context manager. Let me verify the refactor works correctly...
[bash] python -m pytest tests/test_agents_smoke.py::test_import[s01_agent_loop] -v
test_import[s01_agent_loop] PASSED
Final Answer:
Refactor complete. Replaced client.messages.create() with
client.messages.stream(). The loop logic is unchanged - only token
delivery changed from batch to real-time. The test confirms the
module still imports cleanly.
请注意上面逐个令牌流式传输的响应 —“我将从阅读开始”、“当前 s01 使用”、“关键更改是”都逐渐出现在屏幕上。在阻塞模型中,所有这些文本都会在等待数秒后同时出现。这是 Claude Code 用户在每次交互中体验到的行为。
Claude Code 附带了专用文件工具 Read、Write、Edit、Glob、Grep,这并不是因为 bash 无法执行文件操作,而是因为专用工具为模型提供了具有结构化输出的精确语义操作。当模型调用 read 时,它会返回编号行,它可以在后续的 write 中通过编号引用。
当它调用 grep 时,它会返回文件路径和行号,而不是原始终端输出。这种结构允许 Claude 进行精确、有针对性的编辑,而不是覆盖整个文件。
快照机制同样重要。 Claude Code 中的每个 write 调用都会在覆盖之前静默保存以前的文件内容。如果模型的更改破坏了某些内容,revert 一次调用即可恢复原始状态。无需使用 git,无需手动复制文件——harness 会自动处理可逆性。
SNAPSHOTS: dict[str, str | None] = {}
defrun_write(path: str, content: str) -> str:
try:
if os.path.exists(path):
withopen(path, "r", encoding="utf-8", errors="replace") as f:
SNAPSHOTS[path] = f.read()
action = "updated"
else:
SNAPSHOTS[path] = None # None marks file as newly created
action = "created"
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
withopen(path, "w", encoding="utf-8") as f:
f.write(content)
returnf"{action}: {path} (snapshot saved - use revert to undo)"
except Exception as e:
returnf"Error writing {path}: {e}"
defrun_revert(path: str) -> str:
if path notin SNAPSHOTS:
returnf"Error: no snapshot for {path}"
original = SNAPSHOTS.pop(path)
if original isNone:
# File was newly created - revert means delete it
os.remove(path)
returnf"reverted: deleted {path} (it was a new file)"
withopen(path, "w", encoding="utf-8") as f:
f.write(original)
returnf"reverted: {path} restored to previous content"
在我们的 run_write 函数中,我们检查文件是否已经存在。如果是,我们会读取其内容并将其保存在 SNAPSHOTS 字典中,然后再覆盖它。如果该文件不存在,我们通过将其快照设置为 None 将其标记为新创建的。 run_revert函数检查快照,如果是None,则知道删除该文件;否则,它将恢复原始内容。
run_read 的编号行输出使精确编辑成为可能。
def run_read(path: str, start_line: int = None, end_line: int = None) -> str:
try:
withopen(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
s = (start_line or1) - 1
e = end_line orlen(lines)
numbered = "".join(
f"{s+1+i:4d}\t{l}"for i, l inenumerate(lines[s:e])
)
return numbered[:50000] or"(empty file)"
except FileNotFoundError:
returnf"Error: file not found: {path}"
为了测试这一点,我们可以运行一系列工具调用来写入文件,验证内容,然后调用恢复并验证原始内容是否已恢复。
s14 >> The run_bash function in core.py is missing output truncation
for commands that produce very large outputs. Fix it.
[read] agents/core.py (lines 55-80)
55 def run_bash(command: str) -> str:
56 if any(d incommandfor d in _ALWAYS_BLOCK):
57 return"Error: dangerous command blocked"
58 try:
59 r = subprocess.run(command, shell=True, cwd=os.getcwd(),
60 capture_output=True, text=True, timeout=120)
61 out = (r.stdout + r.stderr).strip()
62 return out if out else"(no output)" ← no truncation here
63 except subprocess.TimeoutExpired:
64 return"Error: timeout (120s)"
I can see the issue on line 62 - output is returned without truncation.
I will fix this and add a constant for the limit.
[write] agents/core.py
Updated run_bash to return out[:50000] if out else"(no output)"
Added MAX_OUTPUT = 50_000 constant at module level.
Let me verify the change looks correct before running tests.
[read] agents/core.py (lines 55-65)
55 MAX_OUTPUT = 50_000
56
57 def run_bash(command: str) -> str:
58 if any(d incommandfor d in _ALWAYS_BLOCK):
59 return"Error: dangerous command blocked"
60 try:
61 r = subprocess.run(command, shell=True, cwd=os.getcwd(),
62 capture_output=True, text=True, timeout=120)
63 out = (r.stdout + r.stderr).strip()
64 return out[:MAX_OUTPUT] if out else"(no output)"
65 except subprocess.TimeoutExpired:
[bash] python -m pytest tests/test_agents_smoke.py -q
44 passed in 2.1s
Final Answer:
Fixed. Added MAX_OUTPUT = 50_000 constant and applied truncation on
line 64. Read the result back to confirm the change before running
tests - 44 passing, no regressions.
该模型读取带有编号行的文件,确定存在问题的确切行,进行有针对性的修复,读回结果以验证更改看起来是否正确,然后运行测试。这是按设计工作的扩展工具库 - 结构化读取输出支持精确的写入操作,然后是自动快照,如果测试失败,则可以立即恢复。
Claude Code 权限系统是其研究最多的架构组件之一。第一次运行Claude Code时,会询问是否以自动批准模式或确认模式运行。
该选择直接映射到权限层系统,某些命令始终默默运行,某些命令始终需要明确批准,而某些命令则无论上下文如何都会被彻底阻止。
本节实现与 YAML 配置文件相同的三层模型。安全策略存在于配置中,而不是代码中。更改需要批准的内容是对配置文件的编辑,而不是部署。规则评估作为每个工具调用的预执行包装器运行。
# config/permissions.yaml
always_deny:
- pattern: "rm -rf /"
reason: "Unconditional recursive root delete"
- pattern: "sudo"
reason: "Privilege escalation not allowed"
- pattern: "curl.*\\| *sh|wget.*\\| *sh"
reason: "Pipe-to-shell downloads blocked"
always_allow:
- pattern: "^ls( |$|-)"
reason: "Listing files is always safe"
- pattern: "^git (status|log|diff|show)"
reason: "Read-only git commands are always safe"
- pattern: "^python.*--version|^pip.*--version"
reason: "Version checks are always safe"
ask_user:
- pattern: "^rm "
reason: "File deletion requires confirmation"
- pattern: "^git (commit|push|merge|rebase)"
reason: "Git write operations require confirmation"
- pattern: "^pip install|^npm install"
reason: "Package installation requires confirmation"
- pattern: "\\.env"
reason: "Accessing .env files requires confirmation"
权限检查会在执行之前包装每个调度调用。
RULES = load_rules()
defcheck_permission(tool_name: str, input_str: str,
rules: dict = None) -> tuple[bool, str]:
if rules isNone:
rules = RULES
# Tier 1: always deny - checked first, no exceptions
for rule in rules.get("always_deny", []):
if re.search(rule["pattern"], input_str, re.IGNORECASE):
print(f"\033[31m[DENIED] {rule.get('reason')}\033[0m")
returnFalse, f"Denied: {rule.get('reason')}"
# Tier 2: always allow - pass silently
for rule in rules.get("always_allow", []):
if re.search(rule["pattern"], input_str, re.IGNORECASE):
returnTrue, "allowed"
# Tier 3: ask user - pause and prompt
for rule in rules.get("ask_user", []):
if re.search(rule["pattern"], input_str, re.IGNORECASE):
print(f"\n\033[33m[PERMISSION] {tool_name}: {input_str[:100]}")
print(f" Reason: {rule.get('reason')}\033[0m")
try:
ans = input(" Allow? [y/N] ").strip().lower()
except (EOFError, KeyboardInterrupt):
ans = "n"
return (ans in ("y", "yes")), "user decision"
# Default: allow if no rule matched
returnTrue, "no rule matched"
让我们测试一下这个功能...
s15 >> Install the requests library, then delete the old requirements.txt
and create a new one with all current dependencies.
[bash] pip install requests
[PERMISSION] bash: pip install requests
Reason: Package installation requires confirmation
Allow? [y/N] y
Collecting requests
Downloading requests-2.32.3-py3-none-any.whl (64 kB)
Successfully installed requests-2.32.3
[read] requirements.txt
[bash] rm requirements.txt
[PERMISSION] bash: rm requirements.txt
Reason: File deletion requires confirmation
Allow? [y/N] y
[bash] pip freeze | grep -E "anthropic|python-dotenv|colorama|pyyaml|mcp|redis|requests"
anthropic==0.49.0
colorama==0.4.6
mcp==1.2.0
python-dotenv==1.0.1
PyYAML==6.0.2
redis==5.2.1
requests==2.32.3
[write] requirements.txt
Final Answer:
Installed requests. Deleted old requirements.txt (confirmed by you).
Created new requirements.txt with 7 current dependencies including
the newly installed requests library.
出现两个权限提示,一个是安装包,一个是删除文件。两者都需要在执行框架执行之前得到明确的批准。 ls、read 和 pip freeze 调用静默运行,因为它们与 always_allow 模式匹配。这正是 Claude Code 的权限系统在确认模式下的行为方式,该模型在安全操作上不间断地工作,并且仅在操作产生实际后果时才暂停。
Claude Code 公开了一个钩子系统,允许用户在工具运行之前、完成之后、发生错误时、会话结束时将自定义逻辑附加到代理生命周期中的任何点。
这就是团队在不修改代理 loop 本身的情况下添加成本跟踪、审核日志记录、自定义审批工作流程以及与外部监控系统集成的方式。
事件总线使可观察性成为执行框架的结构属性,而不是事后用螺栓固定的东西。每个重要时刻都会引发一个命名事件。任何注册为处理程序的函数都会接收完整的有效负载。返回 {"block": True} 的 pre_tool_use 钩子可以阻止工具运行——这就是策略执行干净地建立在权限治理之上的方式。
from collections import defaultdict
from datetime import datetime
classEventBus:
def__init__(self):
self._handlers = defaultdict(list)
defon(self, event: str, handler) -> "EventBus":
self._handlers[event].append(handler)
returnself # allow chaining
defemit(self, event: str, **payload) -> list:
results = []
for handler inself._handlers[event]:
try:
result = handler(event=event, **payload)
if result:
results.append(result)
except Exception as e:
print(f"\033[31m[EventBus] hook error on '{event}': {e}\033[0m")
return results
bus = EventBus()
三个内置挂钩可满足最常见的生产需求。
_LOG_FILE = ".agent_events.log"
defhook_logger(event, **payload):
"""Write every event to a log file."""
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
tool = payload.get("tool", "")
line = f"[{ts}] {event}" + (f" tool={tool}"if tool else"")
if"input"in payload:
first = str(list(payload["input"].values())[0])[:60]
line += f" input={first!r}"
if"output"in payload:
line += f" output_len={len(str(payload['output']))}"
if"error"in payload:
line += f" error={payload['error']!r}"
withopen(_LOG_FILE, "a") as f:
f.write(line + "\n")
defhook_stats(event, **payload):
"""Count tool calls per session and print summary at end."""
if event == "session_start":
hook_stats._counts = defaultdict(int)
elif event == "post_tool_use":
hook_stats._counts[payload.get("tool", "?")] += 1
elif event == "session_end":
ifgetattr(hook_stats, "_counts", None):
total = sum(hook_stats._counts.values())
print(f"\033[90m [stats] {total} tool calls: "
f"{dict(hook_stats._counts)}\033[0m")
hook_stats._counts = defaultdict(int)
defhook_timer(event, **payload):
"""Flag tool calls that take longer than 5 seconds."""
if event == "pre_tool_use":
hook_timer._start = datetime.now()
elif event == "post_tool_use"and hook_timer._start:
elapsed = (datetime.now() - hook_timer._start).total_seconds()
if elapsed > 5:
print(f"\033[90m [timer] {payload.get('tool')} "
f"took {elapsed:.1f}s\033[0m")
hook_timer._start = None
bus.on("pre_tool_use", hook_logger)
bus.on("post_tool_use", hook_logger)
bus.on("tool_error", hook_logger)
bus.on("session_start", hook_stats)
bus.on("post_tool_use", hook_stats)
bus.on("session_end", hook_stats)
bus.on("pre_tool_use", hook_timer)
bus.on("post_tool_use", hook_timer)
代理 loop 围绕每个工具调用触发事件。
def agent_loop_with_hooks(messages: list):
bus.emit("session_start")
try:
whileTrue:
with client.messages.stream(...) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
response = stream.get_final_message()
print()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type != "tool_use":
continue
# Fire pre_tool_use - hooks can block execution
pre_results = bus.emit("pre_tool_use",
tool=block.name, input=block.input)
blocked = any(
r.get("block") for r in pre_results
ifisinstance(r, dict)
)
if blocked:
output = "Blocked by hook"
else:
try:
handler = EXTENDED_DISPATCH.get(block.name)
output = handler(block.input) if handler else \
f"Unknown: {block.name}"
bus.emit("post_tool_use",
tool=block.name,
input=block.input,
output=output)
except Exception as e:
output = f"Error: {e}"
bus.emit("tool_error",
tool=block.name, error=str(e))
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
finally:
bus.emit("session_end")
在我们的测试中,我们可以验证每个工具调用是否都记录了时间戳和输入预览,计时器是否标记了任何耗时超过 5 秒的调用,以及统计挂钩是否在会话结束时打印了工具使用情况的摘要。
s16 >> Find all functions in core.py that dont have returntype annotations
and add them. Then check if any function takes more than 3 parameters.
[pre_tool_use] tool=read → logger ✓ timer started
[read] agents/core.py
[post_tool_use] tool=read output_len=8432 → logger ✓ timer: 0.1s
Scanning forfunctions without returntype annotations...
[pre_tool_use] tool=grep → logger ✓ timer started
[grep] def [^:]+\):$ (functions with no return annotation)
agents/core.py:58:def run_bash(command: str):
agents/core.py:98:def run_write(path: str, content: str):
agents/core.py:145:def dispatch_tools(response_content: list, dispatch: dict):
[post_tool_use] tool=grep output_len=312 → logger ✓ timer: 0.2s
Found 3 functions missing return annotations.
Adding -> str, -> str, -> list respectively...
[pre_tool_use] tool=write → logger ✓ timer started
[write] agents/core.py
[post_tool_use] tool=write output_len=48 → logger ✓ timer: 0.1s
Checking forfunctions with more than 3 parameters...
[pre_tool_use] tool=bash → logger ✓ timer started
[bash] python -c "import ast, sys; ..."
stream_loop: 5 parameters (messages, tools, dispatch, system, extra_kwargs)
[post_tool_use] tool=bash output_len=89 → logger ✓ timer: 0.3s
[bash] python -m pytest tests/ -q
44 passed in 2.1s
[stats] 6 tool calls: {'read': 1, 'grep': 1, 'write': 1, 'bash': 2, 'glob': 0}
Final Answer:
Added returntype annotations to 3 functions. One function (stream_loop)
has 5 parameters - flagged for potential refactor but not changed.
All 44 tests passing.
loop 退出后自动打印会话结束统计挂钩。每个工具调用都记录到 .agent_events.log 中,并带有时间戳和输入预览。计时器挂钩在每次调用时运行,并且会标记任何超过 5 秒的调用。
所有这一切都在代理 loop 本身内部没有一行可观察性代码的情况下发生,loop 仅触发事件,钩子决定如何处理它们。这正是 Claude Code 的钩子架构分离关注点的方式。
无法恢复的会话是不能信任长时间任务的会话。如果模型进行了 30 分钟的复杂重构并且终端关闭,那么一切都会丢失,不仅仅是对话,还有导致每个决策的推理上下文。
Claude Code 在您工作时将每条消息、工具调用和结果存储在本地。本节使用三个 REPL 命令实现相同的机制,使持久性可操作。
SESSIONS_DIR = Path(".sessions")
SESSIONS_DIR.mkdir(exist_ok=True)
defnew_session() -> dict:
return {
"id": uuid.uuid4().hex[:8],
"created": datetime.now().isoformat(),
"updated": datetime.now().isoformat(),
"title": "new session",
"messages": [],
}
defsave_session(session: dict):
session["updated"] = datetime.now().isoformat()
path = SESSIONS_DIR / f"{session['id']}.json"
# Serialise Anthropic SDK objects to plain dicts
serialisable = []
for msg in session["messages"]:
content = msg.get("content")
ifisinstance(content, list):
content = [
b.model_dump() ifhasattr(b, "model_dump") else
b.__dict__ ifhasattr(b, "__dict__") else b
for b in content
]
serialisable.append({"role": msg["role"], "content": content})
path.write_text(json.dumps(
{**session, "messages": serialisable}, indent=2
))
defload_session(sid: str) -> dict | None:
path = SESSIONS_DIR / f"{sid}.json"
return json.loads(path.read_text()) if path.exists() elseNone
三个 REPL 命令使持久性可操作 - :resume 继续现有会话,:fork 从任何点分支而不影响原始会话,:sessions 列出保存的所有内容。
# Inside the REPL loop
if query == ":sessions":
for s in list_sessions():
n = len(s.get("messages", []))
print(f" \033[36m{s['id']}\033[0m "
f"{s['updated'][:19]} "
f"{s['title'][:50]} "
f"\033[90m({n} msgs)\033[0m")
continue
if query.startswith(":resume "):
loaded = load_session(query[8:].strip())
if loaded:
session = loaded
print(f" Resumed: \033[36m{session['id']}\033[0m - "
f"{session['title']} ({len(session['messages'])} msgs)")
continue
if query.startswith(":fork "):
orig = load_session(query[6:].strip())
if orig:
session = {
**orig,
"id": uuid.uuid4().hex[:8],
"title": f"fork of {orig['title'][:40]}",
"created": datetime.now().isoformat(),
"updated": datetime.now().isoformat(),
}
save_session(session)
print(f" Forked → \033[36m{session['id']}\033[0m")
continue
可以通过启动会话、进行一些更改,然后恢复会话以验证状态是否完好来对此进行测试。可以通过分叉现有会话、在原始会话和分叉中进行不同的更改并验证两个状态是否独立保留来测试 fork 命令。
s17 >> Add comprehensive error handling to the run_write function
so it handles permission errors, disk full errors, and encoding errors.
New session: a3f2c891
[read] agents/core.py (lines 95-115)
[todo_write]
[0] Read current run_write implementation
[1] Add PermissionError handling
[2] Add OSError (disk full) handling
[3] Add UnicodeEncodeError handling
[4] Write updated implementation
[5] Run tests to verify all error paths
[todo_update] 0 → done
[write] agents/core.py
[todo_update] 1 → done
[todo_update] 2 → done
[todo_update] 3 → done
[todo_update] 4 → done
[bash] python -m pytest tests/test_agents_smoke.py -q
44 passed in 2.1s
[todo_update] 5 → done
Session saved: a3f2c891
--- (terminal closed, session restarted) ---
s17 >> :sessions
a3f2c891 2026-04-01 11:23:45 Add error handling to run_write (14 msgs)
s17 >> :resume a3f2c891
Resumed: a3f2c891 - Add error handling to run_write (14 msgs)
s17 >> The error handling looks good. Now write tests specifically
for each of those three error paths.
[read] agents/core.py (lines 95-130)
[read] tests/test_agents_smoke.py
[write] tests/test_run_write_errors.py
[bash] python -m pytest tests/test_run_write_errors.py -v
test_run_write_permission_error PASSED
test_run_write_disk_full PASSED
test_run_write_encoding_error PASSED
3 passed in 0.3s
Session saved: a3f2c891
s17 >> :fork a3f2c891
Forked → b7e1d334
s17 >> Now in this fork, also add error handling to run_read
using the same patterns.
[read] agents/core.py (lines 75-95)
[write] agents/core.py
[bash] python -m pytest tests/ -q
47 passed in 2.3s
Session saved: b7e1d334
Final Answer:
Original session a3f2c891: run_write has error handling + 3 tests.
Forked session b7e1d334: additionally has run_read error handling.
Both sessions saved independently - you can resume either one.
终端在会话中途关闭并准确地从中断处恢复 — 14 条上下文消息、完整的待办事项状态、已完成操作的完整历史记录。该分叉创建了一个独立的分支,可以在不影响原始分支的情况下探索其他更改。 Claude Code 本身就提供了这种确切的功能,该会话是使长自主会话值得信赖的模式的显式实现。
第六阶段是关于性能和控制,代理从正确转向快速和可操纵——用 asyncio.gather 折叠多工具从顺序转向并发,通过中断注入为用户提供实时转向,通过提示缓存消除冗余令牌支出,并通过官方 MCP 运行时向任何外部服务器开放工具注册表。
这就是 Claude Code 92% 的前缀重用率、并行工具执行和 MCP 支持变得明确且可衡量的地方。
通过执行跟踪分析揭示的 Claude Code 最重要的性能特征之一是,在不必要的情况下,它从不顺序运行工具调用。
当 Claude 在一轮中通过三个 grep 调用和两次读取返回响应时,所有五个同时执行。该回合在最慢的单次呼叫时间内完成,而不是所有五个呼叫的总和。
对于涉及数十次读取和搜索的代码库探索任务,这种差异会显着加剧。
该实现需要将同步调度 loop 重构为异步调度。每个工具处理程序都有一个异步包装器。流调用在执行器中运行,以保持异步事件 loop 空闲。 asyncio.gather() 取代了 loop 在工具块上的顺序。
import asyncio
asyncdef_dispatch_one(block) -> tuple[str, str]:
"""Execute one tool block asynchronously."""
inp = block.input
name = block.name
print(f"\033[33m[{name}⟳] {str(list(inp.values())[0])[:80]}\033[0m")
handler = ASYNC_DISPATCH.get(name)
ifnot handler:
return block.id, f"Unknown tool: {name}"
try:
output = await handler(inp)
except Exception as e:
output = f"Error: {e}"
print(f"\033[90m [{name}] done: {output[:80]}\033[0m")
return block.id, output
asyncdefagent_loop(messages: list):
whileTrue:
# Streaming runs in executor - keeps event loop free for concurrent tools
def_stream():
with client.messages.stream(
model=MODEL, system=DEFAULT_SYSTEM,
messages=messages, tools=EXTENDED_TOOLS, max_tokens=8000,
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
return stream.get_final_message()
response = await asyncio.get_event_loop().run_in_executor(None, _stream)
print()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
tool_blocks = [b for b in response.content if b.type == "tool_use"]
iflen(tool_blocks) > 1:
print(f"\033[90m Running {len(tool_blocks)} tools in parallel...\033[0m")
# All tool calls at once - not one by one
pairs = await asyncio.gather(*[_dispatch_one(b) for b in tool_blocks])
id_to_output = dict(pairs)
results = [
{
"type": "tool_result",
"tool_use_id": b.id,
"content": id_to_output[b.id],
}
for b in tool_blocks
]
messages.append({"role": "user", "content": results})
异步工具实现使用 asyncio.create_subprocess_shell 而不是 subprocess.run,因此 bash 命令是真正的非阻塞。文件操作在线程执行器中运行,因为它们本质上是同步 I/O。写入操作会获得每个路径的锁定,以防止对同一文件进行两次并行写入。
_write_locks: dict[str, asyncio.Lock] = {}
asyncdefasync_bash(command: str) -> str:
ifany(d in command for d in _ALWAYS_BLOCK):
return"Error: blocked"
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=os.getcwd(),
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
out = (stdout.decode() + stderr.decode()).strip()
return out[:50000] if out else"(no output)"
asyncdefasync_write(path: str, content: str) -> str:
# Per-file lock prevents concurrent writes to the same path
if path notin _write_locks:
_write_locks[path] = asyncio.Lock()
asyncwith _write_locks[path]:
loop = asyncio.get_event_loop()
returnawait loop.run_in_executor(None, run_write, path, content)
让我们测试一下...
s18 >> Analyse the entire agents/ directory: find all functions that
take more than 3 parameters, all classes defined, and all
imports used across all files. Report everything.
Running 3 tools in parallel...
[grep⟳] def .+:.+,.+,.+,.+ (functions >3 params)
[grep⟳] ^class (class definitions)
[grep⟳] ^import|^from (import statements)
[grep] done: agents/core.py:145:def stream_loop(messages, tools...
[grep] done: agents/s10_team_protocols.py:28:class AgentState
[grep] done: agents/core.py:1:import os
All 3 greps completed in 0.4s (vs ~1.2s sequential)
> Thinking...
Running 5 tools in parallel...
[read⟳] agents/core.py (lines 140-160)
[read⟳] agents/s10_team_protocols.py (lines 25-45)
[read⟳] agents/s16_event_bus.py (lines 1-30)
[bash⟳] grep -c "^import\|^from" agents/*.py
[bash⟳] grep -rn "class " agents/ | wc -l
[read] done: core.py:145 stream_loop has 5 params
[read] done: s10 has AgentState enum + ProtocolAgent class
[read] done: s16 imports: collections, datetime, core
[bash] done: core.py:8 imports, s16_event_bus.py:6 imports...
[bash] done: 4 classes total
All 5 tools completed in 0.6s (vs ~2.1s sequential)
Final Answer:
Functions with >3 parameters: stream_loop (5 params) in core.py.
Classes defined: AgentState, ProtocolAgent (s10), EventBus (s16),
MailboxBackend (s22) - 4 total across the codebase.
Import analysis: core.py uses 8 imports, average across all files is 5.
Total analysis time: ~1s parallel vs ~3.3s sequential - 3x faster.
三个 grep 同时运行。然后五个读取和 bash 调用同时运行。该模型立即接收所有结果,并在单个推理步骤中综合它们。在顺序执行中,这将是八个单独的回合。同时有两个。
这在较大的代码库上会急剧增加,其中 Claude Code 在编写一行之前会探索数十个文件。
Claude Code 允许您在任务中按 Ctrl+C 重定向代理,而不会丢失其已完成的任何工作。代理不会崩溃,它会读取您的中断,总结其当前进度,并等待新指令。这是 h2A 转向队列:一个异步通道,与代理 loop 并排,随时接受来自任何源的消息。
如果没有这种机制,长期运行的任务就是一种承诺。你启动它,走开,二十分钟后回来,要么它正确完成,要么没有完成,一旦它开始,你就无法控制它。通过中断注入,您可以重定向执行中途,添加代理启动时没有的上下文,或者告诉它停止并总结它到达的位置。
interrupt_queue: asyncio.Queue = asyncio.Queue()
SYSTEM = (
f"You are a coding agent at {os.getcwd()}. "
"When you receive [INTERRUPT], stop immediately. "
"Summarise exactly what you have completed so far, "
"what you were working on, and what remains. "
"Then wait for new instruction."
)
asyncdefagent_loop(messages: list):
whileTrue:
# Check interrupt queue before calling the model
try:
interrupt_msg = interrupt_queue.get_nowait()
print(f"\n\033[31m[INTERRUPT] {interrupt_msg}\033[0m")
messages.append({"role": "user", "content": interrupt_msg})
except asyncio.QueueEmpty:
pass
def_stream():
with client.messages.stream(
model=MODEL, system=SYSTEM,
messages=messages, tools=EXTENDED_TOOLS, max_tokens=8000,
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
return stream.get_final_message()
response = await asyncio.get_event_loop().run_in_executor(None, _stream)
print()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
# Check interrupt queue after model responds, before running tools
try:
interrupt_msg = interrupt_queue.get_nowait()
print(f"\n\033[31m[INTERRUPT] Stopping tool execution\033[0m")
messages.append({"role": "user", "content": interrupt_msg})
continue # go back to top - model handles the interrupt
except asyncio.QueueEmpty:
pass
tool_blocks = [b for b in response.content if b.type == "tool_use"]
pairs = await asyncio.gather(*[_dispatch_one(b) for b in tool_blocks])
id_to_output = dict(pairs)
results = [
{"type": "tool_result", "tool_use_id": b.id, "content": id_to_output[b.id]}
for b in tool_blocks
]
messages.append({"role": "user", "content": results})
asyncdefinput_loop(history: list):
loop = asyncio.get_event_loop()
agent_task = None
whileTrue:
try:
query = await loop.run_in_executor(
None, lambda: input("\033[36ms19 >> \033[0m")
)
except KeyboardInterrupt:
# Ctrl+C at prompt - exit
print("\n Exiting.")
if agent_task andnot agent_task.done():
agent_task.cancel()
break
query = query.strip()
if query.lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_task = asyncio.create_task(agent_loop(history))
try:
await agent_task
except KeyboardInterrupt:
# Ctrl+C while agent runs - inject interrupt
msg = (
"[INTERRUPT] User pressed Ctrl+C. "
"Stop current task. Summarise what you completed "
"so far and what remains incomplete."
)
await interrupt_queue.put(msg)
print(f"\033[31m\n Interrupt queued - agent will stop after current tool.\033[0m")
try:
await asyncio.wait_for(agent_task, timeout=30)
except asyncio.TimeoutError:
print(" Agent did not respond to interrupt within 30s.")
print()
s19 >> Refactor every session file from s01 to s12 to add type annotations
to all function signatures. Start with s01 and work through them all.
[read] agents/s01_agent_loop.py
[todo_write]
[0] Add type annotations to s01_agent_loop.py
[1] Add type annotations to s02_tool_use.py
[2] Add type annotations to s03_todo_write.py
...
[11] Add type annotations to s12_worktree_task_isolation.py
[12] Run mypy across all 12 files to verify
[write] agents/s01_agent_loop.py
[todo_update] 0 → done
[read] agents/s02_tool_use.py
[write] agents/s02_tool_use.py
[todo_update] 1 → done
[read] agents/s03_todo_write.py
^C ← user pressed Ctrl+C
[INTERRUPT] User pressed Ctrl+C. Stop current task. Summarise
what you completed so far and what remains incomplete.
I have completed type annotations for:
✓ s01_agent_loop.py (task 0)
✓ s02_tool_use.py (task 1)
I was mid-way through s03_todo_write.py (task 2) - the file was
read but not yet written. Tasks 3 through 12 have not been started.
What would you like me to do next?
s19 >> Actually skip s03 for now and jump straight to s04 through s06.
[read] agents/s04_subagent.py
[write] agents/s04_subagent.py
[todo_update] 3 → done
[read] agents/s05_skill_loading.py
[write] agents/s05_skill_loading.py
[todo_update] 4 → done
[read] agents/s06_context_compact.py
[write] agents/s06_context_compact.py
[todo_update] 5 → done
[bash] python -m mypy agents/s04_subagent.py agents/s05_skill_loading.py
agents/s06_context_compact.py --strict
Success: no issues found in 3 source files
Final Answer:
Completed type annotations for s01, s02, s04, s05, s06.
Skipped s03 as requested. s07 through s12 still pending.
mypy strict passes on all completed files.
中断在任务中到来,代理干净地停下来,准确地总结它已经做了什么和剩下什么,然后等待。然后用户重定向跳过s03,跳转到s04–s06。代理继续执行新指令,不会丢失任何已完成的工作。这就是 Claude Code 提供的交互模型——长期任务保持可控,而不是成为承诺。
逆向工程 Claude Code 执行跟踪显示所有内部代理调用的提示前缀重用率为 92%。这并非偶然——这是构建每个提示的结果,使稳定的内容排在第一位,可变的内容排在最后。
Anthropic 提示缓存以正常令牌成本的大约 10% 提供这些稳定的前缀。对于在长时间会话中拨打数百个 API 呼叫的代理来说,这会带来非常可观的节省。
系统提示和工具定义是任何代理会话中最稳定的内容,它们在轮流之间不会改变。将它们标记为可缓存意味着第一次调用后的每次调用都会从缓存中获取这些令牌,而不是支付全价。
cache_control 标记告诉 Anthropic 的基础设施在第一次调用时构建 KV 缓存条目,并在共享相同前缀的所有后续调用中提供该条目。
# System prompt as a list of cacheable blocks
SYSTEM_BLOCKS = [
{
"type": "text",
"text": (
f"You are a coding agent at {os.getcwd()}. "
"Use tools to solve tasks. Be concise.\n\n"
"Tool capabilities:\n"
"- bash: execute shell commands\n"
"- read: read files with optional line range\n"
"- write: write files (auto-snapshots for revert)\n"
"- grep: search file content by regex pattern\n"
"- glob: find files by glob pattern\n"
"- revert: restore a file to its pre-write state\n\n"
"Always verify your work. Check outputs before proceeding."
),
"cache_control": {"type": "ephemeral"}, # cache this block
}
]
# Last tool in the array gets cache_control - caches everything up to it
CACHED_TOOLS = EXTENDED_TOOLS[:-1] + [
{
**EXTENDED_TOOLS[-1],
"cache_control": {"type": "ephemeral"},
}
]
令牌使用跟踪器使缓存性能在每次调用时都可见。
class CacheStats:
def__init__(self):
self.cache_created = 0
self.cache_read = 0
self.uncached = 0
self.calls = 0
defrecord(self, usage):
self.calls += 1
self.cache_created += getattr(usage, "cache_creation_input_tokens", 0) or0
self.cache_read += getattr(usage, "cache_read_input_tokens", 0) or0
self.uncached += getattr(usage, "input_tokens", 0) or0
defprint_turn(self, usage):
created = getattr(usage, "cache_creation_input_tokens", 0) or0
read = getattr(usage, "cache_read_input_tokens", 0) or0
if created > 0:
print(f"\033[90m [cache] MISS → {created} tokens written\033[0m")
elif read > 0:
saved = int(read * 0.9)
print(f"\033[90m [cache] HIT → {read} tokens read "
f"(saved ~{saved} tokens)\033[0m")
defprint_summary(self):
ifnotself.calls:
return
total_saved = int(self.cache_read * 0.9)
print(f"\033[90m [cache summary] {self.calls} calls | "
f"written={self.cache_created} | "
f"hits={self.cache_read} | "
f"total saved≈{total_saved} tokens\033[0m")
stats = CacheStats()
s20 >> Read every file in agents/ and produce a one-line summary
of what each one does. Then find which files have the most
tool calls registered in their dispatch map.
[cache] MISS → 1,847 tokens written ← first call builds cache
[glob] agents/*.py
agents/core.py, agents/s01_agent_loop.py ... (24 files)
Running 6 tools in parallel...
[read⟳] agents/core.py
[read⟳] agents/s01_agent_loop.py
[read⟳] agents/s02_tool_use.py
[read⟳] agents/s03_todo_write.py
[read⟳] agents/s04_subagent.py
[read⟳] agents/s05_skill_loading.py
[cache] HIT → 1,847 tokens read (saved ~1,662 tokens)
All 6 reads completed in 0.7s
> Thinking...
Running 6 tools in parallel...
[read⟳] agents/s06_context_compact.py
[read⟳] agents/s07_task_system.py
[read⟳] agents/s08_background_tasks.py
[read⟳] agents/s09_agent_teams.py
[read⟳] agents/s10_team_protocols.py
[read⟳] agents/s11_autonomous_agents.py
[cache] HIT → 1,847 tokens read (saved ~1,662 tokens)
> Thinking...
[grep] DISPATCH.*{ (find dispatch maps)
[cache] HIT → 1,847 tokens read (saved ~1,662 tokens)
[bash] python -c "
import ast, os
for f in sorted(os.listdir('agents')):
if not f.endswith('.py'): continue
src = open(f'agents/{f}').read()
count = src.count('lambda inp:')
if count > 0:
print(f'{f}: {count} tools')
"
s_full.py: 13 tools
s03_todo_write.py: 3 tools
s07_task_system.py: 4 tools
s09_agent_teams.py: 2 tools
[cache] HIT → 1,847 tokens read (saved ~1,662 tokens)
[cache summary] 6 calls | written=1,847 | hits=5 |
total saved≈8,310 tokens
Final Answer:
Summarised all 24 agent files. Files with most registered tools:
s_full.py: 13 tools (all mechanisms combined)
s07_task_system.py: 4 tools
s03_todo_write.py: 3 tools
Cache hit rate: 5/6 calls (83%). ~8,310 tokens saved this session.
第一个调用写入 1,847 个令牌来缓存系统提示和工具定义。随后的每个调用都会以 10% 的成本从缓存中提供相同的令牌。通过 6 次调用,保存了 8,310 个代币。在进行数百次调用的完整 Claude Code 会话中,同样的机制产生了执行跟踪中观察到的 92% 的前缀重用率。
Claude Code 本身支持 MCP - 任何兼容的服务器工具都将成为代理工具注册表中的一等公民。文件系统服务器添加文件工具。 git服务器添加git操作工具。
数据库服务器添加查询工具。该模型以相同的方式调用所有这些内置工具,而不知道工具是本地 Python 函数还是远程服务器进程。
MCP 运行时从 config/mcp_config.yaml 读取服务器配置,在启动时使用官方 MCP Python SDK 连接到每个服务器,调用 list_tools() 来发现每个服务器提供的内容,并在内置工具旁边的调度映射中以前缀名称 mcp__<server>__<tool> 注册所有内容。
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
MCP_SESSIONS: dict[str, ClientSession] = {}
MCP_TOOL_MAP: dict[str, tuple[str, str]] = {} # prefixed_name → (server, tool)
asyncdefconnect_mcp_servers() -> list[dict]:
"""Connect to all configured MCP servers. Returns their tools as Anthropic defs."""
try:
withopen(Path(__file__).parent.parent / "config" / "mcp_config.yaml") as f:
config = yaml.safe_load(f)
except FileNotFoundError:
print("\033[33mNo MCP config found. Running without MCP servers.\033[0m")
return []
mcp_tools = []
for srv in (config.get("servers") or []):
name = srv.get("name", "unknown")
transport = srv.get("transport", "stdio")
try:
if transport == "stdio":
params = StdioServerParameters(
command=srv["command"],
args=srv.get("args", []),
)
read, write = await stdio_client(params).__aenter__()
session = await ClientSession(read, write).__aenter__()
await session.initialize()
else:
print(f"\033[33m [{name}] transport '{transport}' not supported\033[0m")
continue
tool_list = await session.list_tools()
MCP_SESSIONS[name] = session
print(f"\033[90m [MCP] connected: {name} "
f"({len(tool_list.tools)} tools)\033[0m")
for tool in tool_list.tools:
prefixed = f"mcp__{name}__{tool.name}"
MCP_TOOL_MAP[prefixed] = (name, tool.name)
mcp_tools.append({
"name": prefixed,
"description": f"[{name}] {tool.description or tool.name}",
"input_schema": tool.inputSchema or {
"type": "object", "properties": {}
},
})
except Exception as e:
print(f"\033[31m [MCP] failed to connect {name}: {e}\033[0m")
return mcp_tools
asyncdefcall_mcp_tool(prefixed_name: str, arguments: dict) -> str:
"""Route an MCP tool call to the correct server."""
if prefixed_name notin MCP_TOOL_MAP:
returnf"Error: MCP tool not found: {prefixed_name}"
server_name, tool_name = MCP_TOOL_MAP[prefixed_name]
session = MCP_SESSIONS.get(server_name)
ifnot session:
returnf"Error: server not connected: {server_name}"
try:
result = await session.call_tool(tool_name, arguments)
parts = []
for item in (result.content or []):
ifhasattr(item, "text"):
parts.append(item.text)
elifhasattr(item, "data"):
parts.append(f"[binary: {len(item.data)} bytes]")
return"\n".join(parts)[:50000] or"(no output)"
except Exception as e:
returnf"Error calling MCP tool: {e}"
调度路由器以与内置工具相同的方式处理 MCP 工具 - 前缀检查是唯一的区别。
async def dispatch_with_mcp(block) -> tuple[str, str]:
name = block.name
inp = block.input
print(f"\033[33m[{name}] {str(list(inp.values())[0])[:80]}\033[0m")
if name.startswith("mcp__"):
output = await call_mcp_tool(name, inp)
else:
handler = ASYNC_DISPATCH.get(name)
output = await handler(inp) if handler else f"Unknown: {name}"
print(output[:200])
return block.id, output
s21 >> Using the connected filesystem and git MCP servers,
show me the git logfor the last 5 commits and then
read the files that were changed in the most recent commit.
[MCP] connected: filesystem (8 tools)
[MCP] connected: git (6 tools)
Total tools: 14 built-in + 14 MCP = 28 available
> Thinking...
[mcp__git__git_log] {"repo_path": ".", "max_count": 5}
commit a3f2c891 Add error handling to run_write (2026-04-01)
commit b7e1d334 Fork: Add error handling to run_read (2026-04-01)
commit c9d0e112 Phase 5 complete: all production hardening sessions (2026-04-01)
commit d2e8b445 Add type annotations to s01-s06 (2026-03-31)
commit e5f6a778 Initial implementation of core.py (2026-03-30)
> Thinking...
[mcp__git__git_diff] {"repo_path": ".", "ref": "HEAD~1"}
diff --git a/agents/core.py b/agents/core.py
+ except PermissionError as e:
+ return f"Error: permission denied writing {path}: {e}"
+ except OSError as e:
+ return f"Error: OS error writing {path}: {e}"
Changed files in most recent commit: agents/core.py
> Thinking...
Running 2 tools in parallel...
[mcp__filesystem__read_file⟳] {"path": "agents/core.py"}
[bash⟳] git show HEAD --stat
[mcp__filesystem__read_file] done: 8,432 chars
[bash] done: core.py | 8 ++++++++
Both tools completed in 0.4s
Final Answer:
Last 5 commits retrieved via MCP git server.
Most recent commit added PermissionError and OSError handling to
run_write in core.py - 8 lines added, 0 removed.
File read via MCP filesystem server: 8,432 characters, confirms
the 3 new except blocks are present and correctly structured.
MCP tools called identically to built-in tools - no difference
in dispatch routing visible to the model.
该模型称为 mcp__git__git_log、mcp__git__git_diff 和 mcp__filesystem__read_file 的方式与调用 bash 或 grep 的方式完全相同 — 相同的工具调用结构、相同的结果注入、相同的 loop。 MCP 前缀纯粹是调度映射中的路由详细信息。
从模型的角度来看,当服务器在启动时连接时,工具注册表只是从 14 个工具增长到 28 个工具。
这就是 Claude Code 的 MCP 支持如何工作,外部服务器透明地扩展工具注册表,对代理 loop 或模型的交互模式进行零更改。
第七阶段是用生产级替代方案取代教学实现其中基于文件的邮箱成为具有即时交付和跨机器支持的 Redis 发布/订阅渠道,基本的 worktree 创建成为一个完整的生命周期管理器,可以处理真实代码库表面的每个边缘情况,并且整个系统被组装成将所有机制连接在一起的单个可部署参考。
这就是工作原型和可以在生产中运行的系统之间的差距。
第四阶段的 JSONL 邮箱系统作为一种教学机制,但存在三个基本的生产问题。它要求轮询队友在计时器上检查他们的收件箱文件,从而在消息发送和消息接收之间引入人为延迟。
它需要文件锁定才能并发访问,两个代理同时写入同一文件会产生损坏的 JSONL。而且它是单机邮箱文件驻留在一个文件系统上,使得分布式部署不可能。
Claude Code 的内部代理协调使用即时、无锁且跨进程边界的消息传递。 Redis pub/sub 正是提供了这些属性。代理发布到频道,任何订阅者都会在几毫秒内收到它,无需轮询 loop,无文件锁定,无文件系统依赖性。
该实现将两个后端包装在一个公共接口后面,因此第 4 阶段的队友和首席代理逻辑保持完全不变。
import redis.asyncio as aioredis
classMailboxBackend:
"""Common interface - swap backend without changing agent logic."""
asyncdefsend(self, to: str, message: dict): ...
asyncdefreceive(self, name: str, timeout: float = 30.0) -> dict | None: ...
asyncdefclose(self): ...
classRedisMailbox(MailboxBackend):
"""Production backend - instant delivery, cross-machine, no polling."""
def__init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url, decode_responses=True)
self._pubsubs: dict[str, aioredis.client.PubSub] = {}
def_channel(self, name: str) -> str:
returnf"agent:{name}:inbox"
asyncdefsend(self, to: str, message: dict):
payload = json.dumps({
**message,
"ts": datetime.now().isoformat(),
})
awaitself.redis.publish(self._channel(to), payload)
asyncdefreceive(self, name: str, timeout: float = 30.0) -> dict | None:
if name notinself._pubsubs:
ps = self.redis.pubsub()
await ps.subscribe(self._channel(name))
self._pubsubs[name] = ps
ps = self._pubsubs[name]
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
msg = await ps.get_message(
ignore_subscribe_messages=True, timeout=0.1
)
if msg and msg["type"] == "message":
try:
return json.loads(msg["data"])
except json.JSONDecodeError:
return {"body": msg["data"]}
await asyncio.sleep(0.05)
returnNone
asyncdefclose(self):
for ps inself._pubsubs.values():
await ps.unsubscribe()
await ps.close()
awaitself.redis.aclose()
当 Redis 不可用时,我们需要为本地开发构建一个后备方案 - 一个进程内的 asyncio.Queue,它提供相同的接口,但没有跨机器支持或即时交付。
队友 loop 和首席代理逻辑完全不知道他们称为 send() 和 receive() 的邮箱实现,并且不关心它们是如何实现的。
class QueueMailbox(MailboxBackend):
"""Fallback backend — in-process asyncio.Queue when Redis unavailable."""
def__init__(self):
self._queues: dict[str, asyncio.Queue] = {}
def_queue(self, name: str) -> asyncio.Queue:
if name notinself._queues:
self._queues[name] = asyncio.Queue()
returnself._queues[name]
asyncdefsend(self, to: str, message: dict):
awaitself._queue(to).put({
**message, "ts": datetime.now().isoformat()
})
asyncdefreceive(self, name: str, timeout: float = 30.0) -> dict | None:
try:
returnawait asyncio.wait_for(
self._queue(name).get(), timeout=timeout
)
except asyncio.TimeoutError:
returnNone
asyncdefclose(self):
pass
asyncdefmake_mailbox() -> MailboxBackend:
"""Use Redis if available, fall back to Queue."""
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
try:
mb = RedisMailbox(redis_url)
await mb.redis.ping()
print(f"\033[90m [mailbox] Redis connected at {redis_url}\033[0m")
return mb
except Exception as e:
print(f"\033[33m [mailbox] Redis unavailable ({e}) - using Queue\033[0m")
return QueueMailbox()
队友loop与阶段4相同,邮箱界面隐藏了所有传输细节。
async defteammate_loop(
name: str, system: str,
mailbox: MailboxBackend,
stop: asyncio.Event,
):
print(f"\033[90m [{name}] ready on {mailbox.__class__.__name__}\033[0m")
whilenot stop.is_set():
msg = await mailbox.receive(name, timeout=2.0)
ifnot msg:
continue
sender = msg.get("from", "lead")
body = msg.get("body", "")
print(f"\n\033[35m [{name}] task from {sender}: {body[:60]}\033[0m")
# Full agent loop - identical to Phase 4
sub = [{"role": "user", "content": body}]
whileTrue:
resp = await asyncio.get_event_loop().run_in_executor(
None,
lambda: client.messages.create(
model=MODEL, system=system,
messages=sub, tools=EXTENDED_TOOLS, max_tokens=4000,
)
)
sub.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = dispatch_tools(resp.content, EXTENDED_DISPATCH)
sub.append({"role": "user", "content": results})
result = "".join(
b.text for b in sub[-1]["content"] ifhasattr(b, "text")
)
await mailbox.send(sender, {
"from": name, "to": sender,
"type": "result", "body": result,
})
print(f"\033[35m [{name}] result sent to {sender}\033[0m")
在我们的 teammate_loop() 实现中,代理从其邮箱接收消息,使用模型和工具处理它们,并通过相同的邮箱接口将结果发送回发送者。
主导代理可以将任务发送给队友,而无需关心底层传输,它只需调用 mailbox.send() 并等待响应。
现在我们可以并行运行多个队友,每个队友都在自己的 Redis 通道上……
s22 >> We have two specialists available — an explorer and a writer.
Explore the current test coverage gaps in core.py then
write new tests to close them. Run in parallel.
[mailbox] Redis connected at redis://localhost:6379
Teammates: explorer, writer | backend: RedisMailbox
> Thinking...
[send_to_teammate] explorer
Run pytest with coverage on core.py. Find every function
with less than 100% branch coverage. Report the exact
missing branches with line numbers.
[explorer → redis channel agent:explorer:inbox] ← instant delivery
[explorer] task from lead: Run pytest with coverage on core.py...
[explorer] > Thinking...
[explorer] [bash] python -m pytest tests/ --cov=agents/core
--cov-report=term-missing -q
Name Stmts Miss Branch BrPart Cover Missing
agents/core.py 124 12 42 6 88%
Missing: run_revert:L134, check_permission:L198, stream_loop:L241
[explorer → redis channel agent:lead:inbox] ← instant delivery
Result received in8.2s (vs ~9s polling average in Phase 4)
> Thinking...
[send_to_teammate] writer
Write pytest tests for these missing branches in agents/core.py:
1. run_revert line 134: file newly created (SNAPSHOTS[path] isNone)
2. check_permission line 198: ask_user tier matches
3. stream_loop line 241: extra_kwargs forwarded correctly
Target: bring core.py to 100% branch coverage.
[writer → redis channel agent:writer:inbox] ← instant delivery
[writer] task from lead: Write pytest tests for missing branches...
[writer] > Thinking...
[writer] [read] tests/test_agents_smoke.py
[writer] [read] agents/core.py (lines 130-145)
[writer] [read] agents/core.py (lines 194-205)
[writer] [write] tests/test_core_branches.py
[writer] [bash] python -m pytest tests/test_core_branches.py -v
test_run_revert_new_file_deleted PASSED
test_check_permission_ask_user_approved PASSED
test_stream_loop_extra_kwargs_forwarded PASSED
3 passed in0.4s
[writer] [bash] python -m pytest tests/ --cov=agents/core
--cov-report=term-missing -q
agents/core.py 124 0 42 0 100%
[writer → redis channel agent:lead:inbox] ← instant delivery
Result received in23.4s
Final Answer:
Explorer identified 3 missing branches in core.py (88% coverage).
Writer closed all3with targeted tests - core.py now at 100% branch coverage.
Redis delivery latency: <10ms per message vs ~500ms polling in Phase 4.
Both teammates ran on separate Redis channels - zero file locking,
zero polling overhead, ready for multi-machine deployment.
延迟差异在输出中可见,与 JSONL 方法的 500 毫秒轮询间隔相比,Redis 在 10 毫秒内传送消息。对于包含数十条代理间消息的会话,这可以节省有意义的挂机时间。更重要的是,无论两个代理在同一台机器上还是在集群中的不同机器上运行,Redis 后端的工作方式都是相同的 - JSONL 方法根本无法做到这一点。
第 4 阶段的基本 worktree 实现创建并删除了 worktree,但在生产使用表面的每个边缘情况下都会失败。创建 worktree 时,具有未提交更改的 repo 存储库的行为有所不同。
先前崩溃运行中已存在的分支会导致 git worktree add 失败。处于分离 HEAD 状态的 repo 存储无法创建新分支。两个代理在不同的分支中修改同一文件会产生合并冲突,而两个代理都无法捕获该冲突。
Claude Code 通过使用文件快照而不是 worktree 避免了大多数此类问题。但 worktree 提供了更强大的隔离,代理实际上无法触及另一个代理的文件,这使得它们成为大型并行任务的正确选择。生产 worktree 管理器在任何任务执行开始之前系统地处理每个边缘情况。
def _git(*args, cwd=None) -> tuple[int, str, str]:
result = subprocess.run(
["git", *args], capture_output=True, text=True,
cwd=cwd or os.getcwd(),
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
defcheck_git_state() -> dict:
"""Full pre-flight check before any worktree operations."""
_, branch, _ = _git("symbolic-ref", "--short", "HEAD")
_, dirty, _ = _git("status", "--porcelain")
_, wt_list, _ = _git("worktree", "list", "--porcelain")
return {
"branch": branch or"(detached)",
"is_dirty": bool(dirty),
"is_detached": not branch,
"worktree_count": wt_list.count("worktree "),
}
defprune_stale_worktrees() -> int:
"""Remove worktrees whose directories no longer exist."""
_, out, _ = _git("worktree", "list", "--porcelain")
pruned = 0
current_path = None
for line in out.splitlines():
if line.startswith("worktree "):
current_path = line[9:]
elif line == "prunable"and current_path:
print(f"\033[90m [worktree] pruning stale: {current_path}\033[0m")
_git("worktree", "remove", "--force", current_path)
pruned += 1
_git("worktree", "prune")
return pruned
我们还需要围绕 worktree 创建创建一个安全包装器,用于处理每种边缘情况,并在无法继续时提供清晰的错误消息。
这对于良好的用户体验至关重要,因为如果模型尝试创建 worktree 并且由于边缘情况而无提示地失败,代理将感到困惑并可能使任务失败,而没有任何错误指示。
def create_worktree_safe(task_id: str) -> tuple[str, str]:
"""
Safe worktree creation with full edge case handling.
Returns (path, branch) or raises RuntimeError with a clear message.
"""
state = check_git_state()
# Refuse detached HEAD - cannot create branches from here
if state["is_detached"]:
raise RuntimeError(
f"Repository is in detached HEAD state. "
f"Run 'git checkout main' before creating worktrees."
)
# Warn about dirty state - worktree branches from HEAD, not working tree
if state["is_dirty"]:
print(
f"\033[33m [worktree] Warning: uncommitted changes detected. "
f"Worktree will branch from HEAD, not your working tree.\033[0m"
)
branch = f"task/{task_id}"
path = str(Path(os.getcwd()).parent / f".worktree-{task_id[:8]}")
# Clean up path if it exists from a previous crashed run
if Path(path).exists():
print(f"\033[90m [worktree] removing stale path: {path}\033[0m")
_git("worktree", "remove", "--force", path)
shutil.rmtree(path, ignore_errors=True)
# Resolve branch name conflict
rc, _, _ = _git("show-ref", "--verify", f"refs/heads/{branch}")
if rc == 0:
print(f"\033[90m [worktree] branch exists, deleting: {branch}\033[0m")
rc2, _, err = _git("branch", "-D", branch)
if rc2 != 0:
# Branch checked out elsewhere - use alternate name
branch = f"task/{task_id}-{uuid.uuid4().hex[:4]}"
print(f"\033[90m [worktree] using alternate branch: {branch}\033[0m")
rc, _, err = _git("worktree", "add", "-b", branch, path)
if rc != 0:
raise RuntimeError(f"git worktree add failed: {err}")
print(f"\033[90m [worktree] created: {path} (branch: {branch})\033[0m")
return path, branch
同样,我们创建 conflicts 检测,该检测在任何任务执行开始之前运行,以检查是否有任何并行任务修改了重叠文件。如果他们这样做了,我们可以通过明确的消息快速失败,或者警告模型并让它决定如何继续。
def detect_conflicts(completed_tasks: list[dict]) -> list[str]:
"""Check if parallel tasks modified overlapping files."""
changed: dict[str, set] = {}
for task in completed_tasks:
branch = task.get("branch")
tid = task.get("id")
ifnot branch:
continue
_, out, _ = _git("diff", "--name-only", "HEAD", branch)
if out:
changed[tid] = set(out.splitlines())
conflicts = []
task_ids = list(changed.keys())
for i inrange(len(task_ids)):
for j inrange(i + 1, len(task_ids)):
overlap = changed[task_ids[i]] & changed[task_ids[j]]
if overlap:
conflicts.append(
f"Tasks {task_ids[i]} and {task_ids[j]} "
f"both modified: {', '.join(sorted(overlap)[:5])}"
)
return conflicts
为了在具有完整生命周期管理的隔离 worktree 中运行任务,我们需要将整个执行包装在 try/finally 块中,以确保即使任务因异常而失败,worktree 也始终会被清理。
async defrun_task_in_worktree_safe(task: dict) -> dict:
"""Run a task in an isolated worktree with full lifecycle management."""
task_id = task["id"]
wt_path = None
wt_branch = None
try:
wt_path, wt_branch = create_worktree_safe(task_id)
task["branch"] = wt_branch
system = (
f"You are a coding agent in isolated worktree: {wt_path}. "
f"Task: {task['description']}. "
"Your changes are on a separate branch. "
"Summarise every file you modified when done."
)
messages = [{"role": "user", "content": task["description"]}]
whileTrue:
resp = await asyncio.get_event_loop().run_in_executor(
None,
lambda: client.messages.create(
model=MODEL, system=system,
messages=messages,
tools=EXTENDED_TOOLS, max_tokens=8000,
)
)
messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
results = []
for block in resp.content:
if block.type != "tool_use":
continue
if block.name == "bash":
old = os.getcwd()
os.chdir(wt_path)
output = run_bash(block.input["command"])
os.chdir(old)
else:
handler = EXTENDED_DISPATCH.get(block.name)
output = handler(block.input) if handler else"unknown"
print(f"\033[33m [{task_id[:6]}][{block.name}] "
f"{str(list(block.input.values())[0])[:60]}\033[0m")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
task["status"] = "done"
task["result"] = "".join(
b.text for b in messages[-1]["content"] ifhasattr(b, "text")
)
except RuntimeError as e:
task["status"] = "failed"
task["error"] = str(e)
print(f"\033[31m [{task_id}] failed: {e}\033[0m")
finally:
# Always clean up - even on exception
if wt_path and Path(wt_path).exists():
_git("worktree", "remove", "--force", wt_path)
shutil.rmtree(wt_path, ignore_errors=True)
if wt_branch:
_git("branch", "-D", wt_branch)
print(f"\033[90m [{task_id[:6]}] worktree cleaned up\033[0m")
return task
让我们运行两个并行任务的执行跟踪......
s23 >> Run two tasks in parallel isolated worktrees:
Task A: Add input validation to run_bash — reject empty commands
and commands over 10,000 characters.
Task B: Add a call counter to run_bash that tracks how many
times it has been called per session.
Pre-flight check:
Branch: main | Dirty: no | Worktrees: 1 (main only)
Pruned: 0 stale worktrees
[worktree] created: .worktree-task-a1b2 (branch: task/a1b2c3d4)
[worktree] created: .worktree-task-e5f6 (branch: task/e5f6a7b8)
Running 2 tasks in parallel...
[a1b2c3][read] agents/core.py (lines 55-80)
[e5f6a7][read] agents/core.py (lines 55-80)
[a1b2c3][write] agents/core.py (added empty command check line 57)
[e5f6a7][write] agents/core.py (added _call_count = 0 line 54)
[a1b2c3][write] agents/core.py (added length check line 59)
[e5f6a7][write] agents/core.py (added _call_count += 1 line 63)
[a1b2c3][bash] python -m pytest tests/ -q → 44 passed
[e5f6a7][bash] python -m pytest tests/ -q → 44 passed
[a1b2c3] worktree cleaned up
[e5f6a7] worktree cleaned up
Conflict detection:
Tasks a1b2c3d4 and e5f6a7b8 both modified: agents/core.py
Overlap on 1 file - human review required before merging.
Results:
Task A (branch: task/a1b2c3d4):
Added validation for empty commands (returns error string).
Added validation for commands over 10,000 chars (returns error string).
44 tests passing on this branch.
Task B (branch: task/e5f6a7b8):
Added module-level _call_count = 0 counter.
Increments on every run_bash call.
44 tests passing on this branch.
Both branches ready for review. Conflict: both modified core.py
on lines 54-65. Changes are compatible but require manual merge.
Branches preserved for inspection - run 'git diff task/a1b2c3d4
task/e5f6a7b8' to see the overlap before merging.
飞行前检查在创建任何 worktree 之前运行。这两个任务在隔离的目录中完全并行执行。两者都独立运行测试套件并获得干净的结果,因为它们写入不同的文件。冲突检测器在完成并正确识别重叠后运行。
分支被保留而不是被删除,因此可以有意地进行合并。这是生产级 worktree 管理,处理每个边缘情况,考虑每个故障模式,无论执行期间发生什么,都保证清理。
通过实施 23 个单独的会话,该文件回答了每个工程师在阅读各个阶段后提出的问题:当所有阶段同时运行时会是什么样子?
组合文件使用来自第 1 阶段待办事项规划、任务图依赖性跟踪、子代理上下文隔离、按需技能加载、三层上下文压缩、后台任务执行、持久代理团队、FSM 通信协议和 git worktree 隔离的共享基础,将第 2 阶段到第 4 阶段的每个机制连接在一起,所有这些都同时处于活动状态。
我创建的文件有 280 行,因为 core.py 处理共享的所有内容。每种机制仅贡献其独特的逻辑。
from core import (
client, MODEL, DEFAULT_SYSTEM,
EXTENDED_TOOLS, EXTENDED_DISPATCH,
run_bash, run_read, run_write, run_grep, run_glob, run_revert,
load_rules, check_permission, stream_loop, dispatch_tools,
)
# Every mechanism wired in - each delegates to core for tool execution
ALL_DISPATCH = {
**EXTENDED_DISPATCH,
"todo_write": lambda i: todo_write(i["tasks"]),
"todo_read": lambda i: todo_read(),
"todo_update": lambda i: todo_update(i["index"], i["status"]),
"spawn_subagent": lambda i: spawn_subagent(i["prompt"]),
"list_skills": lambda i: list_skills(),
"load_skill": lambda i: load_skill(i["name"]),
"task_create": lambda i: task_create(i["description"], i.get("depends_on"), i.get("priority", "medium")),
"task_list": lambda i: task_list(),
"task_update": lambda i: task_update(i["task_id"], i["status"], i.get("result", "")),
"task_next": lambda i: task_next(),
"bash_background": lambda i: bash_background(i["command"], i.get("label", "")),
"send_to_teammate": lambda i: send_to_teammate(i["name"], i["message"]),
"list_teammates": lambda i: "\n".join(f" {n}"for n in TEAMMATE_SYSTEMS),
}
我们可以模拟使用每种机制的复杂会话,例如:
代理根据需要创建新的“调试”技能,以涵盖系统调试方法。
代理生成一个子代理来分析任务列表 JSON 中的幻像依赖项。
代理在编写新测试时运行后台任务来执行完整的测试套件。
full >> We need to add a new skill called "debugging" that covers
systematic debugging methodology. Create it, then use it
to debug why the task_next function sometimes returns
a blocked task.
Memory loaded from .agent_memory.md
Teammates: explorer, writer | skills: agent-builder, code-review, pdf
> Thinking...
[load_skill] agent-builder
Loaded skill: agent-builder (harness design patterns)
> Thinking...
[todo_write]
[0] Create skills/debugging/SKILL.md with systematic methodology
[1] Load the new debugging skill
[2] Read the task_next function implementation
[3] Identify the bug using the debugging methodology
[4] Write a fix and verify with tests
[write] skills/debugging/SKILL.md
[todo_update] 0 → done
[load_skill] debugging
Loaded skill: debugging (systematic debugging methodology)
[todo_update] 1 → done
> Thinking...
[read] agents/s07_task_system.py (lines 60-85)
60 def task_next() -> str:
61 tasks = _load()
62 done_ids = {t["id"] for t in tasks if t["status"] == "done"}
63 for t in sorted(tasks, key=lambda x: ...):
64 if t["status"] != "pending":
65 continue
66 if all(dep in done_ids for dep in t.get("depends_on", [])):
67 return f"[{t['id']}] {t['description']}"
68 return"(no unblocked tasks)"
[todo_update] 2 → done
> Thinking...
[spawn_subagent]
Read .agent_tasks.json and find any task whose depends_on
list contains an ID that does not exist in the task list.
These phantom dependencies will always block the task.
[subagent] [bash] cat .agent_tasks.json
[subagent] Found: task f6b2d990 depends_on ["x9y8z7w6"]
but x9y8z7w6 does not exist in the task list.
This task will never unblock.
[todo_update] 3 → done
> Thinking...
[compress] context large - summarising older messages...
[compress] 22 messages → 1 summary | saved to .agent_memory.md
[write] agents/s07_task_system.py
Added validation in task_create: if a depends_on ID does not
exist in the current task list, return an error instead of
creating the task with a phantom dependency.
[bash_background] python -m pytest tests/ -v --tb=short
label: full test suite
Background started: 'full test suite'. Notified when done.
[write] tests/test_phantom_dependency.py
[todo_update] 4 → done
[bg] full test suite completed
46 passed in 2.3s (44 original + 2 new phantom dependency tests)
Final Answer:
Created skills/debugging/SKILL.md with systematic debugging methodology.
Used the skill to identify a phantom dependency bug in task_next -
a task referenced a non-existent dependency ID and would never unblock.
Fixed task_create to validate depends_on IDs at creation time.
2 new tests added. 46 total passing.
Context compressed mid-session - memory persisted to .agent_memory.md.
All mechanisms active: skills, subagent, background tasks, compression.
每个机制都做出了一些可见的贡献:技能加载塑造了调试方法,子代理隔离处理 JSON 分析而不污染主上下文,后台执行在代理编写新测试时运行测试套件,当会话变大时自动触发上下文压缩。
到目前为止,我们已经构建了一个完整的 Claude Code 工具,从最小代理 loop 一直到具有流式传输、并行执行、提示缓存、Redis 邮箱、权限治理、会话持久性和官方 MCP 运行时的生产级多代理系统。
该架构干净、不重复且经过充分测试。仍有进一步推进的空间。
并行子代理生成 当前子代理实现是顺序的。将 spawn_subagent 重构为使用 asyncio.gather 将使主导智能体同时调度三个探索子智能体,这正是 Claude Code 内部的做法,通过并行智能体的数量来缩短探索时间。
矢量内存存储 我们的长期记忆是一个平面 Markdown 文件。用像 ChromaDB 这样的轻量级向量存储替换它,可以让代理检索语义相关的记忆,而不是在每个会话中注入整个摘要,从而随着项目的增长保持上下文的重点。
细粒度令牌会计 缓存统计跟踪器对每个会话的令牌进行计数,但不会细分每个任务或每个工具类型的成本。添加一个成本分类账来记录每个操作的支出,可以让团队识别哪些工具调用最昂贵并进行相应的优化。
基于 Webhook 的 Event Bus 事件总线仅在进程内触发挂钩。将其扩展为将事件转发到外部 HTTP 端点将能够与 Slack、Datadog、PagerDuty 或任何监控系统集成,而无需修改代理 loop。
评估框架 测试套件验证执行框架是否正常工作,但不测量代理在实际任务中的执行情况。添加一个 LLM 作为法官评估层,对代理输出的准确性、工具效率和计划遵守情况进行评分,将使 repo 成为一个可基准测试的系统,而不仅仅是一个工作系统。
更多内容请参考:
作者:法里德·汗
阅读时长:77分钟|发布时间:2026-04-06
引用链接: [1] https://levelup.gitconnected.com/building-claude-code-with-harness-engineering-d2e8c0da85f0[5] [2] https://medium.com/@fareedkhandev/d2e8c0da85f0?source=friends_link&sk=f67a164f042bf73b89077b71e8d76370[6]
[1]link: https://medium.com/@fareedkhandev/d2e8c0da85f0?source=friends_link&sk=f67a164f042bf73b89077b71e8d76370
[2]Claude Code 在上线 6 个月内年化收入已突破 10 亿美元: https://www.anthropic.com/news/anthropic-acquires-bun-as-claude-code-reaches-usd1b-milestone
[3]GitHub - FareedKhan-dev/claude-code-from-scratch:Claude Code 架构的 23 个组件: https://github.com/FareedKhan-dev/claude-code-from-scratch
[4]关注我Medium: https://medium.com/@fareedkhandev
[5]https://levelup.gitconnected.com/building-claude-code-with-harness-engineering-d2e8c0da85f0
[6]https://medium.com/@fareedkhandev/d2e8c0da85f0?source=friends_link&sk=f67a164f042bf73b89077b71e8d76370
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-04-22
Claude Design 会取代设计师吗
2026-04-22
Google发布A2UI 0.9:AI直接生成界面
2026-04-22
ChatGPT Image 2 来了,它终结了整个行业!从今天开始,一切都可能是假的。
2026-04-22
GPT-Image-2 全量上线,中文顶到爆,50+ Case 生图实测
2026-04-21
Anthropic 的 Harness 哲学:把 Agent 当牲口,而非宠物
2026-04-21
企业项目AI编程经验分享
2026-04-21
托管 Agent 执行循环只是起点,AgentRun 托管的更是企业 AI 生产全链路
2026-04-21
从玩具到生产力:用真实项目讲透 AI Agent 的 Harness Engineering
2026-01-24
2026-04-15
2026-01-23
2026-01-26
2026-03-31
2026-03-13
2026-02-14
2026-02-03
2026-02-03
2026-02-03