为什么需要多 Agent

单个 LLM Agent 的能力是有限的。当任务变得复杂——比如需要同时进行深入的技术调研、竞品分析和市场预测——一个 Agent 往往难以面面俱到。模型注意力会被分散,工具列表会膨胀到难以管理,Prompt 会变得臃肿不堪。

多 Agent 架构的核心理念与人类团队协作如出一辙:将一个大型任务分解为子任务,分配给具有不同专长的 Agent,通过结构化的通信机制协调它们的输出。每个 Agent 专注于自己擅长的领域,拥有独立的工具集和专属 Prompt,从而在各自子任务上表现得更加精准和高效。

LangGraph 凭借其图结构、子图组合、灵活的状态管理能力,成为构建多 Agent 系统的理想框架。本文将探讨几种主流的多 Agent 协作模式及其 LangGraph 实现。

多 Agent 架构模式

Supervisor 模式(监督者模式)

这是最经典的层级化结构。一个 Supervisor Agent 负责接收用户任务、拆解为子任务、分配给 Worker Agent、整合输出结果。

1
用户请求 → Supervisor → [研究员, 分析师, 写手] → 汇总 → 最终输出

在 LangGraph 中,Supervisor 本身就是一个 Agent 节点,它通过条件边动态路由到不同的 Worker Agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import Literal

class SupervisorState(TypedDict):
messages: Annotated[List, add_messages]
task_delegated: str
research_result: str
analysis_result: str
writing_result: str

# Supervisor 根据状态决定下一个 Worker
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
last_message = state["messages"][-1]
# 解析 Supervisor 的指令(如 "DELEGATE: researcher")
if "DELEGATE: researcher" in last_message.content:
return "researcher"
elif "DELEGATE: analyst" in last_message.content:
return "analyst"
elif "DELEGATE: writer" in last_message.content:
return "writer"
return "__end__"

Supervisor 模式的优势在于结构清晰、易于控制和调试,适合任务可预先拆解的批处理式工作流。

层级化模式(Hierarchical)

当 Supervisor 本身也变得臃肿时,可以引入中间管理层:

1
2
3
4
用户请求 → 主 Supervisor
├── 技术 Supervisor → [前端 Agent, 后端 Agent, 数据库 Agent]
├── 产品 Supervisor → [需求 Agent, 设计 Agent, 竞品 Agent]
└── 组长 Agent(汇总)

LangGraph 通过**子图(Subgraph)**机制天然支持这种嵌套:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph.graph import StateGraph

# 技术团队的子图
tech_team_graph = StateGraph(TeamState)
tech_team_graph.add_node("frontend_agent", frontend_node)
tech_team_graph.add_node("backend_agent", backend_node)
tech_team_graph.add_node("db_agent", db_node)
# ... 内部边和条件边
tech_team = tech_team_graph.compile()

# 主图组合子图
main_graph = StateGraph(MainState)
main_graph.add_node("tech_team", tech_team) # 将整个子图作为一个节点
main_graph.add_node("product_team", product_team)
main_graph.add_node("lead_agent", lead_node)
# ... 顶层边和条件边

子图对外表现为一个黑盒节点——接收共享的 State,内部独立执行,完成后将结果写回 State。这种组合模式让复杂系统的模块化成为可能。

Swarm/Collaborative 模式(群体协作模式)

与层级模式不同,Swarm 模式没有固定的调度者。每个 Agent 地位平等,通过消息传递直接通信:

1
2
3
4
5
6
7
8
class SwarmState(TypedDict):
messages: Annotated[List, add_messages]
# 每个 Agent 的工作区
engineer_output: str
reviewer_output: str
# 共识状态
consensus_reached: bool
iteration_count: int

Swarm 模式的一个典型实现是辩论协议:多个 Agent 各自给出方案,互相评审、修订,经过多轮迭代达成共识。这种模式在需要多方视角的任务(如代码审查、风险评估、创意生成)中表现出色。

状态管理与 Schema 设计

多 Agent 系统的状态管理是最大的工程挑战。一个精心设计的 State Schema 可以显著降低复杂度:

消息归并策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph.graph.message import add_messages

class MultiAgentState(TypedDict):
# 全局消息历史:所有 Agent 共享,add_messages 自动归并
messages: Annotated[List, add_messages]

# 各 Agent 的独立输出:覆盖式归并(默认行为)
researcher_report: str
analyst_insights: str

# 任务队列:自定义 reducer
task_queue: Annotated[List[str], task_queue_reducer]

# 元数据
current_agent: str
round_count: int

共享状态 vs 隔离状态

共享区域(如 messages):适合需要全局可见的数据,如 Agent 间的通信消息、最终输出。

隔离区域(如 researcher_report):适合各 Agent 的内部工作成果,避免相互干扰。当一个 Agent 需要另一个 Agent 的输出时,通过显式的数据传递来实现,而不是随意读取。

关键设计原则

  1. 最小化共享状态:只共享必要的控制信息,Agent 的工作产物尽可能隔离。
  2. 使用自定义 Reducer:默认的覆盖式归并不一定适合所有字段。例如,对于任务列表,你可能需要追加而非覆盖。
  3. 状态字段命名统一:避免 researcher_output / researcher_result / research_findings 同时存在的情况,统一命名约定。
  4. 将状态作为契约文档:State 的定义就是各 Agent 之间的接口规范,团队成员应该先就 State Schema 达成共识。

Map-Reduce 并行执行

当需要对多个数据源执行同类型分析时,Map-Reduce 模式可以显著提升效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.graph import StateGraph, END
from langgraph.constants import Send

class MapReduceState(TypedDict):
sources: List[str] # 待分析的数据源列表
individual_analyses: Annotated[List, analysis_combiner] # 各源的分析结果
final_summary: str # 汇总结果

def mapper_router(state: MapReduceState):
"""为每个数据源生成一个并行任务"""
return [Send("analyze_single", {"source": src}) for src in state["sources"]]

def analyze_single(state: dict) -> dict:
"""单个数据源的分析节点"""
analysis = llm.invoke(f"分析以下数据源:{state['source']}")
return {"individual_analyses": [analysis]}

def summarizer(state: MapReduceState) -> dict:
"""汇总所有分析结果"""
all_analyses = "\n---\n".join(state["individual_analyses"])
summary = llm.invoke(f"基于以下分析生成综合报告:\n{all_analyses}")
return {"final_summary": summary.content}

graph = StateGraph(MapReduceState)
graph.add_node("analyze_single", analyze_single)
graph.add_node("summarizer", summarizer)
graph.set_entry_point("analyze_single") # 先进入,由 mapper_router 扇出
graph.add_conditional_edges("analyze_single", ..., path_map={"summarizer": "summarizer"})
graph.add_edge("summarizer", END)

Send 是 LangGraph 的扇出(fan-out)机制——一个节点可以发送多个并行调用到同一个目标节点,每个调用获得不同的参数。这对于”对 10 个新闻源分别做情感分析”这类任务特别有效。

错误处理与重试策略

在多 Agent 系统中,错误会通过图边传播。LangGraph 提供了多层次的处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langgraph.errors import GraphRecursionError

# 1. 节点级 try/except
def robust_agent_node(state: AgentState) -> dict:
max_retries = 3
for attempt in range(max_retries):
try:
response = llm.invoke(state["messages"])
return {"messages": [response]}
except Exception as e:
if attempt == max_retries - 1:
# 最终失败,交由 fallback 处理
return {
"messages": [AIMessage(content="处理失败,请稍后重试")],
"error": str(e)
}

# 2. 图级的递归限制
# 防止 Agent 陷入无限工具调用循环
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["sensitive_operations"],
)
app = app.with_config({"recursion_limit": 25})

常见的重试策略包括:指数退避(exponential backoff)、熔断(circuit breaking)、超时限制(timeout wrapping)。LangGraph 不内置这些,但可以在节点函数中自由实现。

调试与可观测性

当图变得复杂(10+ 节点,多层嵌套),调试就成为必须直面的挑战。除了本文已述的记录和回调外,两种关键方法是:

图可视化

1
2
3
4
# 生成 Mermaid 图
from langgraph.graph import StateGraph
# ... 构建 graph ...
print(graph.get_graph().draw_mermaid())

LangSmith 追踪

启用 LangSmith 集成后,每一步的执行日志、token 消耗、延迟指标都会自动记录到仪表盘中。设置 LANGCHAIN_TRACING_V2=true 环境变量后,无需额外代码即可获得完整的执行追踪。

生产环境考量

并发控制

LangGraph 的并行节点执行受到 Python GIL 的限制。对于 CPU 密集型任务,考虑使用 asyncio 的事件循环,或者将重计算节点部署为独立的微服务,通过 HTTP 调用集成。

超时管理

在节点内部实现超时控制:

1
2
3
4
5
6
7
8
9
10
11
import asyncio

async def agent_node_with_timeout(state: AgentState) -> dict:
try:
response = await asyncio.wait_for(
llm.ainvoke(state["messages"]),
timeout=30.0 # 30 秒超时
)
return {"messages": [response]}
except asyncio.TimeoutError:
return {"messages": [AIMessage(content="请求超时,请简化问题后重试")]}

成本控制

多 Agent 系统的一个隐性成本是模型调用次数的膨胀。每个 Agent 的每次推理都消耗 token。优化策略包括:使用更小的模型处理简单任务(如路由、分类);对重复查询实施缓存;限制最大推理轮数;设置每轮 token 预算。

结语

多 Agent 协作是 AI 应用从”单兵作战”走向”团队协作”的关键跃迁。LangGraph 通过 StateGraph、子图组合、Send 扇出等机制,为这种跃迁提供了坚实的工程基础。

在实际项目中,建议从最简单的 Supervisor 模式开始——它能覆盖 60% 以上的多 Agent 需求。当任务间的依赖关系变得复杂时,再引入子图和 Map-Reduce。记住,多 Agent 系统的复杂度是指数级增长的,每次添加新的 Agent 之前,先问自己:这个任务的哪个维度需要独立的专业能力?如果答案不清晰,可能只需要优化现有 Agent 的 Prompt 和工具集。