从 LangChain 链到有状态 Agent

在上一篇文章中,我们探讨了 LangChain 如何通过链式调用来抽象 LLM 工作流。然而,一个关键的问题逐渐浮出水面:传统的链式抽象本质上是无状态的 DAG(有向无环图)

当你需要构建一个真正的 Agent——它需要在思考与行动之间循环迭代、根据中间结果动态选择下一步、在多个”专家”之间来回切换——链式抽象就不够用了。你很快会发现自己不得不写复杂的 while 循环和条件分支,而框架提供的封装反而成了限制。

LangGraph 正是为解决这一痛点而生的。它的核心思想很简单:将 Agent 工作流建模为一个有状态的图(Stateful Graph),其中节点代表计算步骤,边代表控制流,状态在图的执行过程中持续传递和更新。这与传统的状态机模型高度一致,但专门为 LLM 应用做了优化。

LangGraph 核心概念

State:一切围绕状态

在 LangGraph 中,State 是第一公民。它通常是一个 TypedDict(或 Pydantic 模型),定义了图中每个节点共享的数据结构:

1
2
3
4
5
6
7
from typing import TypedDict, Annotated, List
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
messages: Annotated[List, add_messages] # 对话消息,使用 add_messages 归并
next_step: str # 下一次要执行的动作
final_answer: str # 最终输出

这里的 Annotated 注解是关键机制——它定义了状态字段如何被更新。add_messages 是 LangGraph 内置的归并器(reducer),它告诉图引擎”这个字段不是覆盖,而是追加新的消息”。你也可以定义自定义 reducer 来实现更复杂的状态归并逻辑。

Nodes:计算单元

每个 Node 是一个接受 State、返回部分 State 更新的函数。Node 可以是一次 LLM 调用、一次工具执行、一段任意 Python 逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def agent_node(state: AgentState) -> dict:
"""Agent 思考节点:调用 LLM 决定下一步"""
response = llm.invoke(state["messages"])
return {"messages": [response]}

def tool_node(state: AgentState) -> dict:
"""工具执行节点:执行模型请求的 tool_call"""
tool_calls = state["messages"][-1].tool_calls
results = []
for tc in tool_calls:
result = execute_tool(tc["name"], tc["args"])
results.append(
ToolMessage(content=str(result), tool_call_id=tc["id"])
)
return {"messages": results}

Edges:控制流

  • 普通边(Normal Edge):固定地从一个节点指向下一个节点。
  • 条件边(Conditional Edge):根据当前 State 动态路由到不同节点——这是实现 Agent 循环的关键。
1
2
3
4
5
6
def should_continue(state: AgentState) -> str:
"""条件判断:继续调用工具,还是输出最终答案"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "__end__"

条件边的路由函数接收 State,返回下一个节点的名称。返回 "__end__" 表示图执行结束。

Graph 类型

StateGraph

StateGraph 是最通用的图类型,你需要显式定义 State 的结构。它适合大多数 Agent 场景——需要对状态进行精细控制的工作流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langgraph.graph import StateGraph, END

graph = StateGraph(AgentState)

# 添加节点
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)

# 添加边
graph.set_entry_point("agent")
graph.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "__end__": END}
)
graph.add_edge("tools", "agent") # 工具执行后回到 agent 继续思考

app = graph.compile()

MessageGraph

MessageGraphStateGraph 的一个便捷封装,其 State 只有一个 messages 字段。对于简单的对话式 Agent,可以省去自定义 State 结构:

1
2
3
4
5
6
from langgraph.graph import MessageGraph

graph = MessageGraph()
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
# ... 边的配置与 StateGraph 一致

Checkpointing:持久化与对话历史

LangGraph 的 Checkpointing 机制是区别于 LangChain 的基石特性。它为图执行提供快照和恢复能力,使得:

  1. 多轮对话:用户每次输入后,图从上次的 checkpoint 恢复继续执行,而不是从头开始。
  2. 中断恢复:图在执行过程中可以暂停(如等待用户输入),之后从断点继续。
  3. 时间旅行:可以回溯到任意历史状态,支持分支探索。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 使用 thread_id 区分不同的对话
config = {"configurable": {"thread_id": "conversation-1"}}

# 第一轮
result = app.invoke(
{"messages": [HumanMessage(content="帮我查一下北京的天气")]},
config=config
)

# 第二轮:自动加载之前的状态
result = app.invoke(
{"messages": [HumanMessage(content="那上海呢?")]},
config=config # 相同的 thread_id
)

生产环境中,可以将 MemorySaver 替换为 SqliteSaver 或自定义的持久化后端,实现服务重启后的恢复。

Streaming:流式输出

LangGraph 原生支持多种流式模式:

1
2
3
4
5
6
7
8
9
10
# 逐事件流式输出
for event in app.stream(inputs, config=config):
for node_name, output in event.items():
print(f"[{node_name}]: {output}")

# 逐 token 流式输出(需要模型支持)
for event in app.astream_events(inputs, config=config, version="v2"):
if event["event"] == "on_chat_model_stream":
content = event["data"]["chunk"].content
print(content, end="", flush=True)

stream 返回每个 Node 执行完成后的状态快照;astream_events 则提供更细粒度的事件流,适合构建实时 UI。

Human-in-the-Loop:人工介入

在实际应用中,某些关键决策需要人工确认。LangGraph 提供了优雅的 interrupt 机制:

1
2
3
4
5
6
7
8
from langgraph.checkpoint import interrupt

def sensitive_action_node(state: AgentState) -> dict:
"""敏感操作前,暂停等待人工批准"""
approval = interrupt(f"即将执行:{state['pending_action']},是否继续?")
if approval.get("decision") != "approved":
return {"messages": [AIMessage(content="操作已取消")]}
return execute_sensitive_action(state)

当图执行到 interrupt 时,会暂停并将控制权交还给调用方。调用方通过新的 invoke 调用,传入 Command(resume=...) 来继续执行:

1
2
# 图暂停后,人工审核通过,继续执行
app.invoke(Command(resume={"decision": "approved"}), config=config)

完整示例:带工具的循环 Agent

下面是一个完整的、可以实际运行的 Agent 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, ToolMessage, AIMessage
import json

# 1. 定义 State
class AgentState(TypedDict):
messages: Annotated[List, add_messages]

# 2. 定义工具
def get_weather(city: str) -> str:
"""查询天气"""
weather_data = {"北京": "晴 25°C", "上海": "多云 28°C", "深圳": "阵雨 30°C"}
return weather_data.get(city, "未知城市")

tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)

# 3. 定义节点
def call_model(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}

def call_tools(state: AgentState) -> dict:
last_message = state["messages"][-1]
results = []
for tc in last_message.tool_calls:
if tc["name"] == "get_weather":
result = get_weather(**tc["args"])
results.append(
ToolMessage(content=result, tool_call_id=tc["id"])
)
return {"messages": results}

# 4. 路由函数
def should_continue(state: AgentState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "__end__"

# 5. 构建图
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", call_tools)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue, {
"tools": "tools", "__end__": END
})
graph.add_edge("tools", "agent")

app = graph.compile(checkpointer=MemorySaver())

# 6. 运行
config = {"configurable": {"thread_id": "demo-1"}}
result = app.invoke(
{"messages": [HumanMessage(content="北京今天天气怎么样?")]},
config=config
)
print(result["messages"][-1].content)

LangGraph vs LangChain Agents:如何选择

维度 LangChain AgentExecutor LangGraph
控制粒度 黑盒循环,只能通过配置调整 显式图结构,完全可控
状态管理 简单的消息列表 自定义 State,支持持久化
复杂流程 单一 agent + tools 多 Agent、子图、条件分支
人工介入 不支持 原生 interrupt 支持
调试 依赖 LangSmith 图可视化 + checkpoint 回溯
学习曲线 中等

简单的工具调用 Agent直接用 LangChain 的 create_openai_functions_agent 即可;需要循环推理、多步骤规划、人工审核、多 Agent 协作的场景,LangGraph 是当前最佳选择。

结语

LangGraph 将 Agent 开发从”配置链参数”提升到了”设计状态机”的抽象层次。这种转变反映了 AI 应用从简单的”输入-输出”调用,向复杂的”感知-推理-行动”自主系统的演进趋势。掌握了 StateGraph、条件边、Checkpointing 和 Streaming 这四个核心概念,你就已经具备了构建生产级 AI Agent 的基础能力。在下一篇文章中,我们将深入多 Agent 协作——看看 LangGraph 如何让多个专用 Agent 像一个团队一样协同工作。