为什么需要多 Agent
单个 LLM Agent 的能力是有限的。当任务变得复杂——比如需要同时进行深入的技术调研、竞品分析和市场预测——一个 Agent 往往难以面面俱到。模型注意力会被分散,工具列表会膨胀到难以管理,Prompt 会变得臃肿不堪。
多 Agent 架构的核心理念与人类团队协作如出一辙:将一个大型任务分解为子任务,分配给具有不同专长的 Agent,通过结构化的通信机制协调它们的输出。每个 Agent 专注于自己擅长的领域,拥有独立的工具集和专属 Prompt,从而在各自子任务上表现得更加精准和高效。
LangGraph 凭借其图结构、子图组合、灵活的状态管理能力,成为构建多 Agent 系统的理想框架。本文将探讨几种主流的多 Agent 协作模式及其 LangGraph 实现。
多 Agent 架构模式
Supervisor 模式(监督者模式)
这是最经典的层级化结构。一个 Supervisor Agent 负责接收用户任务、拆解为子任务、分配给 Worker Agent、整合输出结果。
1
| 用户请求 → Supervisor → [研究员, 分析师, 写手] → 汇总 → 最终输出
|
在 LangGraph 中,Supervisor 本身就是一个 Agent 节点,它通过条件边动态路由到不同的 Worker Agent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from typing import Literal
class SupervisorState(TypedDict): messages: Annotated[List, add_messages] task_delegated: str research_result: str analysis_result: str writing_result: str
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]: last_message = state["messages"][-1] if "DELEGATE: researcher" in last_message.content: return "researcher" elif "DELEGATE: analyst" in last_message.content: return "analyst" elif "DELEGATE: writer" in last_message.content: return "writer" return "__end__"
|
Supervisor 模式的优势在于结构清晰、易于控制和调试,适合任务可预先拆解的批处理式工作流。
层级化模式(Hierarchical)
当 Supervisor 本身也变得臃肿时,可以引入中间管理层:
1 2 3 4
| 用户请求 → 主 Supervisor ├── 技术 Supervisor → [前端 Agent, 后端 Agent, 数据库 Agent] ├── 产品 Supervisor → [需求 Agent, 设计 Agent, 竞品 Agent] └── 组长 Agent(汇总)
|
LangGraph 通过**子图(Subgraph)**机制天然支持这种嵌套:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from langgraph.graph import StateGraph
tech_team_graph = StateGraph(TeamState) tech_team_graph.add_node("frontend_agent", frontend_node) tech_team_graph.add_node("backend_agent", backend_node) tech_team_graph.add_node("db_agent", db_node)
tech_team = tech_team_graph.compile()
main_graph = StateGraph(MainState) main_graph.add_node("tech_team", tech_team) main_graph.add_node("product_team", product_team) main_graph.add_node("lead_agent", lead_node)
|
子图对外表现为一个黑盒节点——接收共享的 State,内部独立执行,完成后将结果写回 State。这种组合模式让复杂系统的模块化成为可能。
Swarm/Collaborative 模式(群体协作模式)
与层级模式不同,Swarm 模式没有固定的调度者。每个 Agent 地位平等,通过消息传递直接通信:
1 2 3 4 5 6 7 8
| class SwarmState(TypedDict): messages: Annotated[List, add_messages] engineer_output: str reviewer_output: str consensus_reached: bool iteration_count: int
|
Swarm 模式的一个典型实现是辩论协议:多个 Agent 各自给出方案,互相评审、修订,经过多轮迭代达成共识。这种模式在需要多方视角的任务(如代码审查、风险评估、创意生成)中表现出色。
状态管理与 Schema 设计
多 Agent 系统的状态管理是最大的工程挑战。一个精心设计的 State Schema 可以显著降低复杂度:
消息归并策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from langgraph.graph.message import add_messages
class MultiAgentState(TypedDict): messages: Annotated[List, add_messages] researcher_report: str analyst_insights: str task_queue: Annotated[List[str], task_queue_reducer] current_agent: str round_count: int
|
共享状态 vs 隔离状态
共享区域(如 messages):适合需要全局可见的数据,如 Agent 间的通信消息、最终输出。
隔离区域(如 researcher_report):适合各 Agent 的内部工作成果,避免相互干扰。当一个 Agent 需要另一个 Agent 的输出时,通过显式的数据传递来实现,而不是随意读取。
关键设计原则
- 最小化共享状态:只共享必要的控制信息,Agent 的工作产物尽可能隔离。
- 使用自定义 Reducer:默认的覆盖式归并不一定适合所有字段。例如,对于任务列表,你可能需要追加而非覆盖。
- 状态字段命名统一:避免
researcher_output / researcher_result / research_findings 同时存在的情况,统一命名约定。
- 将状态作为契约文档:State 的定义就是各 Agent 之间的接口规范,团队成员应该先就 State Schema 达成共识。
Map-Reduce 并行执行
当需要对多个数据源执行同类型分析时,Map-Reduce 模式可以显著提升效率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from langgraph.graph import StateGraph, END from langgraph.constants import Send
class MapReduceState(TypedDict): sources: List[str] individual_analyses: Annotated[List, analysis_combiner] final_summary: str
def mapper_router(state: MapReduceState): """为每个数据源生成一个并行任务""" return [Send("analyze_single", {"source": src}) for src in state["sources"]]
def analyze_single(state: dict) -> dict: """单个数据源的分析节点""" analysis = llm.invoke(f"分析以下数据源:{state['source']}") return {"individual_analyses": [analysis]}
def summarizer(state: MapReduceState) -> dict: """汇总所有分析结果""" all_analyses = "\n---\n".join(state["individual_analyses"]) summary = llm.invoke(f"基于以下分析生成综合报告:\n{all_analyses}") return {"final_summary": summary.content}
graph = StateGraph(MapReduceState) graph.add_node("analyze_single", analyze_single) graph.add_node("summarizer", summarizer) graph.set_entry_point("analyze_single") graph.add_conditional_edges("analyze_single", ..., path_map={"summarizer": "summarizer"}) graph.add_edge("summarizer", END)
|
Send 是 LangGraph 的扇出(fan-out)机制——一个节点可以发送多个并行调用到同一个目标节点,每个调用获得不同的参数。这对于”对 10 个新闻源分别做情感分析”这类任务特别有效。
错误处理与重试策略
在多 Agent 系统中,错误会通过图边传播。LangGraph 提供了多层次的处理机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| from langgraph.errors import GraphRecursionError
def robust_agent_node(state: AgentState) -> dict: max_retries = 3 for attempt in range(max_retries): try: response = llm.invoke(state["messages"]) return {"messages": [response]} except Exception as e: if attempt == max_retries - 1: return { "messages": [AIMessage(content="处理失败,请稍后重试")], "error": str(e) }
app = graph.compile( checkpointer=checkpointer, interrupt_before=["sensitive_operations"], ) app = app.with_config({"recursion_limit": 25})
|
常见的重试策略包括:指数退避(exponential backoff)、熔断(circuit breaking)、超时限制(timeout wrapping)。LangGraph 不内置这些,但可以在节点函数中自由实现。
调试与可观测性
当图变得复杂(10+ 节点,多层嵌套),调试就成为必须直面的挑战。除了本文已述的记录和回调外,两种关键方法是:
图可视化
1 2 3 4
| from langgraph.graph import StateGraph
print(graph.get_graph().draw_mermaid())
|
LangSmith 追踪
启用 LangSmith 集成后,每一步的执行日志、token 消耗、延迟指标都会自动记录到仪表盘中。设置 LANGCHAIN_TRACING_V2=true 环境变量后,无需额外代码即可获得完整的执行追踪。
生产环境考量
并发控制
LangGraph 的并行节点执行受到 Python GIL 的限制。对于 CPU 密集型任务,考虑使用 asyncio 的事件循环,或者将重计算节点部署为独立的微服务,通过 HTTP 调用集成。
超时管理
在节点内部实现超时控制:
1 2 3 4 5 6 7 8 9 10 11
| import asyncio
async def agent_node_with_timeout(state: AgentState) -> dict: try: response = await asyncio.wait_for( llm.ainvoke(state["messages"]), timeout=30.0 ) return {"messages": [response]} except asyncio.TimeoutError: return {"messages": [AIMessage(content="请求超时,请简化问题后重试")]}
|
成本控制
多 Agent 系统的一个隐性成本是模型调用次数的膨胀。每个 Agent 的每次推理都消耗 token。优化策略包括:使用更小的模型处理简单任务(如路由、分类);对重复查询实施缓存;限制最大推理轮数;设置每轮 token 预算。
结语
多 Agent 协作是 AI 应用从”单兵作战”走向”团队协作”的关键跃迁。LangGraph 通过 StateGraph、子图组合、Send 扇出等机制,为这种跃迁提供了坚实的工程基础。
在实际项目中,建议从最简单的 Supervisor 模式开始——它能覆盖 60% 以上的多 Agent 需求。当任务间的依赖关系变得复杂时,再引入子图和 Map-Reduce。记住,多 Agent 系统的复杂度是指数级增长的,每次添加新的 Agent 之前,先问自己:这个任务的哪个维度需要独立的专业能力?如果答案不清晰,可能只需要优化现有 Agent 的 Prompt 和工具集。