LangGraph v1.x 实战源码面试手册

版本基准:langgraph==1.2.0,Python 版本要求 >=3.10
适用对象:希望系统掌握 LangGraph Graph API、状态管理、持久化、人机交互、LLM/工具调用、多智能体编排,以及面试复习的 Python 开发者。
更新时间:2026-05-15。

目录

1. 环境准备与核心概念

1.1 安装

1
2
3
python -m venv .venv
source .venv/bin/activate # Windows PowerShell: .\.venv\Scripts\Activate.ps1
pip install -U "langgraph==1.2.0" "langchain>=1.0" "langchain-core>=1.0"

如果要运行 LLM 示例,还需要安装对应模型 Provider:

1
2
3
# OpenAI 示例
pip install -U langchain-openai
export OPENAI_API_KEY="sk-..." # Windows PowerShell: $env:OPENAI_API_KEY="sk-..."

1.2 LangGraph 的核心抽象

抽象 作用 常见 API
State 图执行期间共享的状态结构,通常用 TypedDict、dataclass 或 Pydantic model 定义 class State(TypedDict): ...
Reducer 决定同一个状态 key 如何合并多次更新;未声明 reducer 时默认覆盖 Annotated[list[str], operator.add]
Node 一个可执行单元,读取 state,返回局部状态更新,也可以返回 Command 控制流 builder.add_node("name", fn)
Edge 节点间的静态流转关系 add_edge(START, "node")
Conditional Edge 根据函数返回值动态路由 add_conditional_edges("router", route_fn, mapping)
Send 动态 fan-out,每个分支可携带自己的局部 state Send("worker", {"item": item})
Command 在节点内同时更新 state 并跳转,或用于 interrupt 后恢复 Command(update=..., goto=...)Command(resume=...)
Checkpointer 短期记忆,按 thread_id 保存每个 super-step 的 checkpoint InMemorySaver、Postgres saver
Store 长期记忆,跨 thread 保存任意业务记忆 InMemoryStoreBaseStore
Runtime Context 每次运行注入的只读上下文,比如 user_id、租户、请求参数 StateGraph(..., context_schema=Context)

1.3 最小心智模型

LangGraph 不是“链式调用框架”,而是一个状态化图运行时:

  1. 你定义 state schema,说明图中允许读写哪些 key。
  2. 你把 Python 函数或 Runnable 注册为节点。
  3. 你用边描述节点执行顺序,用条件边或 Command 描述动态控制流。
  4. compile() 将 builder 编译成可执行图。
  5. 运行时按 Pregel/Bulk Synchronous Parallel 风格执行:一个 super-step 中被调度的节点并行执行,节点写出的更新在 step 结束后统一合并到 channel。
  6. 如果启用 checkpointer,每个 super-step 边界会保存 checkpoint,从而支持中断恢复、短期记忆、回放与 time travel。

2. 示例调用:从基础流程到多智能体编排

本章所有非 LLM 示例都可以直接本地运行。LLM 示例需要有效 API key;如果不想访问云模型,可替换为支持 tool calling 的本地模型封装。

2.1 基础顺序流程:StateGraph、节点与边

场景:把输入文本依次经过“清洗”和“加后缀”两个节点。

依赖

1
pip install -U "langgraph==1.2.0"

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph


class TextState(TypedDict):
# 图中共享状态。默认 reducer 是覆盖:节点返回的新 text 会覆盖旧 text。
text: str


def normalize(state: TextState) -> dict:
"""节点 1:读取 state,返回局部更新。"""
return {"text": state["text"].strip().lower()}


def append_suffix(state: TextState) -> dict:
"""节点 2:继续更新同一个 key。"""
return {"text": state["text"] + " | processed"}


builder = StateGraph(TextState)
builder.add_node("normalize", normalize)
builder.add_node("append_suffix", append_suffix)

builder.add_edge(START, "normalize")
builder.add_edge("normalize", "append_suffix")
builder.add_edge("append_suffix", END)

graph = builder.compile()

result = graph.invoke({"text": " Hello LangGraph "})
print(result)

核心参数说明

参数/方法 说明
StateGraph(TextState) 创建图 builder,TextState 是全局状态结构
add_node(name, fn) 注册节点,节点函数第一个参数是 state
add_edge(START, "normalize") 定义入口边
compile() 编译为可执行图;编译前只是 builder,不能执行
invoke(input) 同步执行图并返回最终 state

预期输出

1
{'text': 'hello langgraph | processed'}

结果解读normalizeappend_suffix 都写 text,因为没有声明 reducer,后一个节点对 text 的更新会覆盖前一个值。

常见坑点

  • 节点返回值必须是 dict、Command 或符合 LangGraph 更新协议的对象;返回裸字符串会触发 InvalidUpdateError
  • 至少要有从 START 出发的入口边,否则编译校验会失败。

2.2 状态管理与 Reducer:追加而不是覆盖

场景:多个节点都向 logs 写入日志,希望保留所有日志。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import operator
from typing import Annotated
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph


class AuditState(TypedDict):
task: str
# operator.add 表示同一个 key 的多次更新会做列表拼接,而不是覆盖。
logs: Annotated[list[str], operator.add]


def receive(state: AuditState) -> dict:
return {"logs": [f"receive task={state['task']}"]}


def validate(state: AuditState) -> dict:
return {"logs": ["validate ok"]}


def finish(state: AuditState) -> dict:
return {"logs": ["finish"]}


builder = StateGraph(AuditState)
builder.add_node("receive", receive)
builder.add_node("validate", validate)
builder.add_node("finish", finish)
builder.add_edge(START, "receive")
builder.add_edge("receive", "validate")
builder.add_edge("validate", "finish")
builder.add_edge("finish", END)

graph = builder.compile()
print(graph.invoke({"task": "invoice-001", "logs": []}))

预期输出

1
{'task': 'invoice-001', 'logs': ['receive task=invoice-001', 'validate ok', 'finish']}

核心参数说明

写法 含义
Annotated[list[str], operator.add] logs 声明 reducer,合并时调用 operator.add(old, update)
{"logs": ["..."]} 节点只返回本次增量,不需要手动读取旧日志

结果解读:LangGraph 内部会把 schema key 转换成 channel。没有 reducer 的 key 通常使用 LastValue,带 reducer 的 key 会使用聚合 channel,因此能处理多节点写入。

常见坑点

  • reducer 必须是确定性的;不要在 reducer 内访问网络、随机数或数据库。
  • reducer 的左右参数类型要兼容。比如 list + str 会失败,节点应返回列表增量。

2.3 条件分支:add_conditional_edges

场景:根据订单金额进入普通审核或高风险审核。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from typing import Literal
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph


class OrderState(TypedDict):
order_id: str
amount: float
decision: str


def classify(state: OrderState) -> dict:
return {"decision": "pending"}


def route(state: OrderState) -> Literal["normal", "risk"]:
return "risk" if state["amount"] >= 1000 else "normal"


def normal_review(state: OrderState) -> dict:
return {"decision": "approved_by_normal_review"}


def risk_review(state: OrderState) -> dict:
return {"decision": "manual_risk_review_required"}


builder = StateGraph(OrderState)
builder.add_node("classify", classify)
builder.add_node("normal_review", normal_review)
builder.add_node("risk_review", risk_review)
builder.add_edge(START, "classify")
builder.add_conditional_edges(
"classify",
route,
{
"normal": "normal_review",
"risk": "risk_review",
},
)
builder.add_edge("normal_review", END)
builder.add_edge("risk_review", END)

graph = builder.compile()

print(graph.invoke({"order_id": "A-1", "amount": 300, "decision": ""}))
print(graph.invoke({"order_id": "A-2", "amount": 5000, "decision": ""}))

预期输出

1
2
{'order_id': 'A-1', 'amount': 300, 'decision': 'approved_by_normal_review'}
{'order_id': 'A-2', 'amount': 5000, 'decision': 'manual_risk_review_required'}

结果解读route 不更新 state,只返回路由标签;mapping 决定标签对应哪个节点。

常见坑点

  • 条件函数返回值要和 mapping key 对齐。
  • 如果条件函数直接返回节点名,可以不传 mapping,但面试或团队项目中建议显式 mapping,便于读图和重构。

2.4 循环流程:重试直到满足条件

场景:模拟一个“草稿评分”流程,分数不足时继续修改,最多执行若干轮。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from typing import Literal
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph


class DraftState(TypedDict):
draft: str
score: int
attempts: int


def write_or_revise(state: DraftState) -> dict:
attempts = state["attempts"] + 1
draft = state["draft"] + f" -> revision-{attempts}"
score = min(100, state["score"] + 35)
return {"draft": draft, "score": score, "attempts": attempts}


def should_continue(state: DraftState) -> Literal["revise", "done"]:
if state["score"] >= 80:
return "done"
return "revise"


builder = StateGraph(DraftState)
builder.add_node("write_or_revise", write_or_revise)
builder.add_edge(START, "write_or_revise")
builder.add_conditional_edges(
"write_or_revise",
should_continue,
{
"revise": "write_or_revise",
"done": END,
},
)

graph = builder.compile()

result = graph.invoke(
{"draft": "v0", "score": 0, "attempts": 0},
config={"recursion_limit": 10},
)
print(result)

预期输出

1
{'draft': 'v0 -> revision-1 -> revision-2 -> revision-3', 'score': 100, 'attempts': 3}

核心参数说明

参数 说明
recursion_limit 最大图 step 数限制,用于防止错误条件造成无限循环
"revise": "write_or_revise" 条件边指回自身,形成循环

常见坑点

  • 循环必须有明确退出条件。
  • recursion_limit 不是业务重试次数,而是图执行 step 上限;节点数越多,消耗 step 越快。

2.5 并行 Fan-out/Fan-in:Send 与 Map-Reduce

场景:把一批主题分发给多个 worker 并行处理,再汇总结果。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import operator
from typing import Annotated
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph
from langgraph.types import Send


class OverallState(TypedDict):
subjects: list[str]
summaries: Annotated[list[str], operator.add]
final_report: str


class WorkerState(TypedDict):
subject: str


def dispatch(state: OverallState):
# 动态创建多个 worker 任务,每个任务获得自己的局部输入。
return [Send("summarize_one", {"subject": subject}) for subject in state["subjects"]]


def summarize_one(state: WorkerState) -> dict:
return {"summaries": [f"{state['subject']}: key insight"]}


def reduce_report(state: OverallState) -> dict:
return {"final_report": "\n".join(state["summaries"])}


builder = StateGraph(OverallState)
builder.add_node("summarize_one", summarize_one)
builder.add_node("reduce_report", reduce_report)
builder.add_conditional_edges(START, dispatch)
builder.add_edge("summarize_one", "reduce_report")
builder.add_edge("reduce_report", END)

graph = builder.compile()

result = graph.invoke(
{
"subjects": ["state", "checkpoint", "interrupt"],
"summaries": [],
"final_report": "",
}
)
print(result["final_report"])

预期输出

1
2
3
state: key insight
checkpoint: key insight
interrupt: key insight

结果解读dispatch 返回多个 Send 对象,LangGraph 会调度多个 summarize_one 任务。summaries 使用 operator.add 聚合多个 worker 的返回值,最后进入 reduce_report

常见坑点

  • fan-out 的汇总 key 必须有 reducer,否则多个 worker 同时写同一个 key 会互相覆盖或触发冲突。
  • 并行分支不要依赖执行顺序;如果顺序重要,给结果带 index,reduce 时排序。

2.6 人机交互:interrupt + Command(resume=…)

场景:生成内容后暂停给人工审核,人工返回修改后的文本。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt


class ReviewState(TypedDict):
generated_text: str


def draft(state: ReviewState) -> dict:
return {"generated_text": "原始草稿:LangGraph 可以编排智能体。"}


def human_review(state: ReviewState) -> dict:
# interrupt 的参数会暴露给调用方,常用来传递审核说明和当前内容。
edited_text = interrupt(
{
"instruction": "请审核并编辑 generated_text",
"content": state["generated_text"],
}
)
# 恢复执行时,Command(resume=...) 的值会成为 interrupt(...) 的返回值。
return {"generated_text": edited_text}


builder = StateGraph(ReviewState)
builder.add_node("draft", draft)
builder.add_node("human_review", human_review)
builder.add_edge(START, "draft")
builder.add_edge("draft", "human_review")
builder.add_edge("human_review", END)

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-thread-001"}}

first = graph.invoke({"generated_text": ""}, config=config)
print("paused:", first)

final = graph.invoke(
Command(resume="终稿:LangGraph 适合编排可持久化、可中断的智能体工作流。"),
config=config,
)
print("final:", final)

预期输出示意

1
2
paused: {'generated_text': '原始草稿:LangGraph 可以编排智能体。', '__interrupt__': (...)}
final: {'generated_text': '终稿:LangGraph 适合编排可持久化、可中断的智能体工作流。'}

核心参数说明

参数/对象 说明
interrupt(payload) 暂停当前图执行,并把 payload 返回给调用方
Command(resume=value) 恢复中断,value 成为 interrupt() 的返回值
checkpointer 人机交互必须依赖 checkpoint 保存暂停点
thread_id 恢复时必须使用同一个 thread_id

关键规则

  • interrupt 不等于 Python 的 input()。恢复时节点会从头重新执行,不是从 interrupt 那一行继续。
  • 不要把 interrupt 包在 try/except 中吞掉异常。
  • 同一个节点里多个 interrupt 的顺序必须稳定,恢复值按顺序匹配。
  • interrupt payload 和 resume value 最好保持 JSON 可序列化。

2.7 短期记忆:InMemorySaver 与 thread_id

场景:同一个会话 thread 多次调用时保留消息历史,不同 thread 隔离。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import operator
from typing import Annotated
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph


class ChatState(TypedDict):
messages: Annotated[list[str], operator.add]


def bot(state: ChatState) -> dict:
last_user_msg = state["messages"][-1]
return {"messages": [f"bot: 收到 {last_user_msg}"]}


builder = StateGraph(ChatState)
builder.add_node("bot", bot)
builder.add_edge(START, "bot")
builder.add_edge("bot", END)

graph = builder.compile(checkpointer=InMemorySaver())

thread_a = {"configurable": {"thread_id": "user-a-session-1"}}
thread_b = {"configurable": {"thread_id": "user-b-session-1"}}

print(graph.invoke({"messages": ["user: hi"]}, config=thread_a))
print(graph.invoke({"messages": ["user: remember me"]}, config=thread_a))
print(graph.invoke({"messages": ["user: new session"]}, config=thread_b))

预期输出示意

1
2
3
{'messages': ['user: hi', 'bot: 收到 user: hi']}
{'messages': ['user: hi', 'bot: 收到 user: hi', 'user: remember me', 'bot: 收到 user: remember me']}
{'messages': ['user: new session', 'bot: 收到 user: new session']}

结果解读thread_a 第二次调用会读取上一次 checkpoint 中的 state,并把新输入按 reducer 合并进去;thread_b 使用不同 thread_id,所以状态隔离。

常见坑点

  • 启用 checkpointer 后要传 {"configurable": {"thread_id": "..."}}
  • InMemorySaver 适合开发和测试,进程重启即丢失;生产应使用 Postgres、MongoDB、Redis 等持久化 saver。

2.8 长期记忆:InMemoryStore + context_schema

场景:跨多个 thread 保存用户偏好,比如“用户喜欢喝咖啡”。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import uuid
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.runtime import Runtime
from langgraph.store.memory import InMemoryStore


class MemoryState(TypedDict):
input: str
answer: str


class Context(TypedDict):
user_id: str


def remember_or_answer(
state: MemoryState,
runtime: Runtime[Context],
) -> dict:
user_id = runtime.context["user_id"]
namespace = (user_id, "preferences")
text = state["input"]

if "喜欢" in text:
runtime.store.put(
namespace,
str(uuid.uuid4()),
{"preference": text},
)
return {"answer": "已记录你的偏好。"}

memories = runtime.store.search(namespace, limit=5)
preferences = [item.value["preference"] for item in memories]
return {"answer": f"我记得这些偏好:{preferences}"}


builder = StateGraph(MemoryState, context_schema=Context)
builder.add_node("memory", remember_or_answer)
builder.add_edge(START, "memory")
builder.add_edge("memory", END)

graph = builder.compile(
checkpointer=InMemorySaver(),
store=InMemoryStore(),
)

config_1 = {"configurable": {"thread_id": "thread-1"}}
config_2 = {"configurable": {"thread_id": "thread-2"}}
context = {"user_id": "user-001"}

print(graph.invoke({"input": "我喜欢手冲咖啡", "answer": ""}, config=config_1, context=context))
print(graph.invoke({"input": "你记得我什么?", "answer": ""}, config=config_2, context=context))

预期输出示意

1
2
{'input': '我喜欢手冲咖啡', 'answer': '已记录你的偏好。'}
{'input': '你记得我什么?', 'answer': "我记得这些偏好:['我喜欢手冲咖啡']"}

结果解读:checkpointer 管理 thread 内状态,store 管理跨 thread 的长期记忆。这里 thread-1 写入的偏好,可以被 thread-2 读取,因为 store namespace 使用的是同一个 user_id

常见坑点

  • context_schema 是 v1 推荐的运行上下文方式;旧的 config_schema 已被弃用。
  • 长期记忆要设计 namespace,例如 (tenant_id, user_id, "preferences"),避免不同用户或租户串数据。
  • 不要把全部长期记忆无脑塞进 prompt,应搜索、过滤、摘要后使用。

2.9 LLM 与工具集成:ToolNode + tools_condition

场景:LLM 判断是否需要调用工具,工具执行后结果回到 LLM,直到无需工具为止。

依赖

1
2
pip install -U "langgraph==1.2.0" "langchain>=1.0" langchain-openai
export OPENAI_API_KEY="sk-..."

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langchain.chat_models import init_chat_model
from langchain.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition


@tool
def multiply(a: int, b: int) -> int:
"""Multiply two integers."""
return a * b


tools = [multiply]

# 可替换为支持 tool calling 的模型,例如 OpenAI、Anthropic 或兼容接口模型。
llm = init_chat_model("openai:gpt-4.1-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)


def call_model(state: MessagesState) -> dict:
messages = [
SystemMessage(content="You are a careful math assistant. Use tools when needed."),
*state["messages"],
]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}


builder = StateGraph(MessagesState)
builder.add_node("llm", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "llm")

# tools_condition 会检查最后一条 AIMessage 是否包含 tool_calls。
# 如果有,路由到 tools;如果没有,路由到 END。
builder.add_conditional_edges("llm", tools_condition)
builder.add_edge("tools", "llm")

graph = builder.compile()

result = graph.invoke(
{"messages": [HumanMessage(content="What is 17 * 23?")]}
)

for message in result["messages"]:
print(type(message).__name__, message.content)

预期输出解读

  • 第一轮 LLM 通常返回带 tool_callsAIMessage
  • ToolNode 执行 multiply(a=17, b=23),产出 ToolMessage
  • 第二轮 LLM 读取工具结果,返回自然语言答案,例如 17 * 23 = 391

核心参数说明

对象 说明
MessagesState 内置消息状态,通常包含 messages,适合聊天和工具调用
@tool 把 Python 函数声明成可被模型调用的工具
llm.bind_tools(tools) 把工具 schema 绑定给支持 tool calling 的模型
ToolNode(tools) 根据最后一条 AIMessage 的 tool_calls 执行对应工具
tools_condition 预置条件路由函数,有工具调用则到 tools,否则结束

常见坑点

  • 模型必须支持 tool calling;不支持时可能只输出工具名文本而不产生 tool_calls
  • 工具 docstring 和类型注解会影响工具 schema,建议写清楚参数含义。
  • 不要在工具中执行不受控的 eval、shell 或数据库写操作;生产环境要加权限和审计。

2.10 多智能体编排:Router/Supervisor 模式

场景:一个 supervisor 根据任务类型,把请求交给 researcher 或 coder,再统一收口。

这个示例不依赖真实 LLM,便于本地运行;实际项目中可把 supervisorresearchercoder 替换成 LLM 节点或子图。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph
from langgraph.types import Command


class AgentState(TypedDict):
task: str
route: str
artifacts: Annotated[list[str], operator.add]
final_answer: str


def supervisor(state: AgentState) -> Command[Literal["researcher", "coder"]]:
task = state["task"].lower()
if "代码" in task or "code" in task:
return Command(update={"route": "coder"}, goto="coder")
return Command(update={"route": "researcher"}, goto="researcher")


def researcher(state: AgentState) -> dict:
return {
"artifacts": [
"researcher: 梳理背景、术语、约束与参考资料。"
]
}


def coder(state: AgentState) -> dict:
return {
"artifacts": [
"coder: 设计接口、实现步骤、测试策略。"
]
}


def synthesize(state: AgentState) -> dict:
return {
"final_answer": (
f"route={state['route']}\n"
+ "\n".join(state["artifacts"])
)
}


builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)
builder.add_node("synthesize", synthesize)

builder.add_edge(START, "supervisor")
builder.add_edge("researcher", "synthesize")
builder.add_edge("coder", "synthesize")
builder.add_edge("synthesize", END)

graph = builder.compile()

print(
graph.invoke(
{
"task": "请给出 LangGraph 多智能体代码设计",
"route": "",
"artifacts": [],
"final_answer": "",
}
)["final_answer"]
)

预期输出

1
2
route=coder
coder: 设计接口、实现步骤、测试策略。

结果解读supervisor 返回 Command(update=..., goto=...),在同一个节点内完成状态更新和动态路由。researchercoder 共享同一全局 state,但只通过 artifacts 输出结构化结果,最后由 synthesize 汇总。

常见坑点

  • 如果节点返回 Command(goto=...),不要再给该节点额外添加静态出边,否则动态目标和静态目标可能都会执行。
  • Command[Literal["a", "b"]] 的类型标注会参与图校验,Literal 中声明的目标节点必须已经注册。
  • 多智能体不是越多越好。建议按“能力边界”和“上下文隔离”拆分,而不是按人设拆分。
  • 多智能体共享 state 时要限制写入 key,避免多个 agent 互相覆盖。

3. 源码梳理:核心设计与执行链路

本节基于 LangGraph v1.x 主线源码阅读,重点关注 libs/langgraph/langgraphlibs/checkpoint/langgraph/checkpoint

3.1 整体架构

LangGraph 可以分成四层:

层级 责任 代表模块/类
图构建层 接收 state schema、节点、边、条件边,做静态校验 langgraph.graph.state.StateGraph
状态与通道层 把 schema key 转换成 channel,决定状态如何读写与合并 BaseChannelLastValueBinaryOperatorAggregateEphemeralValue
执行引擎层 调度节点,按 super-step 执行、合并更新、处理流式输出 PregelPregelNodeChannelWriteChannelRead
持久化层 保存 checkpoint、pending writes、thread 状态和历史 BaseCheckpointSaverInMemorySaver、Postgres/Mongo/Redis saver

核心思想是“节点不直接修改全局 state”。节点只返回更新,运行时在 step 边界统一写入 channel。这个设计让并行、重试、checkpoint、time travel 和 human-in-the-loop 都能建立在同一套状态机制上。

3.2 核心模块划分

3.2.1 状态定义模块

入口:langgraph.graph.state

关键职责:

  • 读取 TypedDict、dataclass、Pydantic model 等 schema 的类型注解。
  • 识别 Annotated[T, reducer]
  • 把每个 state key 转换成对应 channel。
  • 区分普通 channel 与 managed value。

简化理解:

1
2
3
4
5
State schema
-> type hints
-> key -> channel
-> nodes read selected channels
-> node updates write back to channels

默认情况下,普通 key 类似“最后一次写入获胜”。带 reducer 的 key 则使用聚合逻辑,例如列表追加、集合合并、计数累加。

3.2.2 图构建模块

入口:StateGraph

核心数据结构:

字段 作用
nodes 节点名到节点定义的映射
edges 静态边集合
branches 条件边/分支定义
channels state key 对应的 channel
managed 由运行时管理的值
schemas 每个 schema 对应的 key/channel 信息
waiting_edges 多起点等待边,用于部分 fan-in 场景

StateGraph 是 builder,不负责真正执行。源码注释明确指出,必须调用 .compile() 生成可执行图后,才能使用 invoke()stream()ainvoke() 等方法。

3.2.3 执行引擎模块

入口:langgraph.pregel.main.Pregel

执行模型参考 Pregel/Bulk Synchronous Parallel:

  1. Plan:根据上一个 step 更新过的 channel,选择本轮要执行的 actor,也就是 PregelNode
  2. Execution:并行执行所有被选中的节点。节点在这一阶段看不到同 step 其他节点刚写出的更新。
  3. Update:收集所有节点写出的更新,统一更新 channel。
  4. 重复执行,直到没有节点可调度、到达 END,或超过递归/step 限制。

这个模型解释了两个重要现象:

  • 并行节点的输出要靠 reducer 合并,不能假设执行顺序。
  • checkpoint 保存发生在 super-step 边界,因此恢复点也是 step 级别,不是任意代码行级别。

3.2.4 检查点模块

入口:langgraph.checkpoint.base

BaseCheckpointSaver 的职责是按 thread_id 保存和读取 checkpoint。checkpoint 中包含:

  • channel values:各 channel 当前值。
  • channel versions:channel 版本,用于判断哪些节点需要被触发。
  • versions seen:节点已经看过的 channel 版本。
  • pending sends/writes:尚待调度或待提交的写入。
  • metadata 和 parent config:支持历史、回放和 time travel。

短期记忆使用 checkpointer。长期记忆使用 store。两者经常同时出现,但职责不同。

3.3 关键核心类与方法

StateGraph

作用:图的构建器。

常用方法:

方法 作用
__init__(state_schema, context_schema=None, input_schema=None, output_schema=None) 定义图的状态、上下文、输入输出 schema
add_node(name, action, ...) 注册节点,支持 retry、cache、timeout、metadata 等参数
add_edge(start, end) 添加静态边
add_conditional_edges(source, path, path_map=None) 添加条件边
set_entry_point(node) / set_finish_point(node) 设置入口/终点的便捷方法
compile(checkpointer=None, store=None, ...) 编译为 CompiledStateGraph

设计要点:

  • StateGraph 本身不执行,只记录图结构。
  • v1 推荐使用 context_schema 注入运行上下文,config_schema 已弃用。
  • add_node 可接收普通函数、异步函数或 Runnable。

CompiledStateGraph

作用:可执行图,实现 LangChain Runnable 接口。

常用能力:

方法 作用
invoke(input, config=None, context=None) 同步执行
stream(input, config=None, stream_mode=...) 流式执行
ainvoke(...) / astream(...) 异步执行
get_state(config) 获取 thread 当前状态
get_state_history(config) 获取 checkpoint 历史
update_state(config, values, as_node=...) 手动更新状态,常用于调试和人工修复

源码上,CompiledStateGraph 继承 Pregel,并在编译时把 builder 中的节点、边、分支挂载成 Pregel 可调度的 actor/channel。

Node / PregelNode

LangGraph 用户层看到的是“节点函数”,底层会包装为 PregelNode

  • 订阅某些 input channels。
  • 读取 state 和 runtime context。
  • 调用用户函数/Runnable。
  • 把返回值转换成 channel writes。
  • 支持 retry、timeout、cache、error handler 等执行策略。

节点函数常见签名:

1
2
3
def node(state: State) -> dict: ...
def node(state: State, runtime: Runtime[Context]) -> dict: ...
def node(state: State, runtime: Runtime[Context], store: BaseStore) -> dict: ...

Channel

Channel 是 state key 的运行时承载结构。它不仅保存值,也定义如何接受更新。

常见 channel:

Channel 用途
LastValue 默认行为,保留最后一次写入
BinaryOperatorAggregate 使用 reducer 聚合多次更新
EphemeralValue 临时触发类 channel,常用于 START 或分支触发
Topic pub/sub 风格,多值通信

BaseChannel 关键方法:

方法 作用
get() 读取当前值
update(values) 用一批更新修改当前值
checkpoint() 返回可序列化快照
from_checkpoint(checkpoint) 从快照恢复 channel

Checkpointer

Checkpointer 负责将 thread 状态持久化。compile(checkpointer=...) 后,每次执行都应传:

1
config = {"configurable": {"thread_id": "some-thread-id"}}

生产建议:

  • 开发测试用 InMemorySaver
  • 服务端生产用 Postgres、MongoDB 或 Redis saver。
  • 对安全敏感场景,确认 serializer 策略,不要随意启用 pickle fallback。

Command

Command 是控制流原语,常见用途:

用途 示例
节点内更新并跳转 Command(update={"x": 1}, goto="next")
子图跳回父图 Command(goto="parent_node", graph=Command.PARENT)
interrupt 恢复 graph.invoke(Command(resume=value), config=config)

注意:如果节点返回 Command(goto=...),应避免再为该节点配置静态出边,否则可能产生多目标执行。

3.4 核心执行链路

3.4.1 图的编译流程

1
2
3
4
5
6
7
8
9
10
11
12
StateGraph(...)
-> _add_schema(state/input/output schema)
-> add_node / add_edge / add_conditional_edges
-> compile(...)
-> validate nodes/edges/interrupt points
-> 创建 CompiledStateGraph
-> attach_node(START)
-> attach_node(each user node)
-> attach_edge(static edges)
-> attach_branch(conditional edges)
-> validate compiled graph
-> Runnable: invoke/stream/ainvoke/astream

关键点:

  • 编译会把用户层节点转换成 Pregel actor。
  • 条件边会被转换成分支写入逻辑。
  • state key 会映射到 channel,节点更新会写入对应 channel。
  • checkpointer、store、cache、debug 等运行时能力在 compile 阶段注入。

3.4.2 状态流转机制

1
2
3
4
5
6
7
8
9
input dict
-> 写入 START/input channel
-> 触发入口节点
-> 节点读取 state snapshot
-> 节点返回 partial update
-> ChannelWrite 将 update 转换为 channel updates
-> step 结束时 channel.update(values)
-> reducer/LastValue 决定最终 state
-> 下一个 step 根据更新过的 channel 调度节点

为什么节点只返回 partial update?

  • 降低节点耦合,不需要每个节点复制完整 state。
  • 支持多个节点并行写不同 key。
  • 支持 reducer 聚合同一 key 的多次更新。
  • 便于 checkpoint 记录增量与恢复。

3.4.3 消息传递逻辑

在聊天/工具调用场景,MessagesState 本质上仍是 state schema,只是围绕 messages 做了适合消息列表的 reducer。

典型链路:

1
2
3
4
5
6
7
8
9
HumanMessage
-> llm node
-> AIMessage(content or tool_calls)
-> tools_condition
-> no tool_calls: END
-> has tool_calls: ToolNode
-> ToolMessage
-> llm node
-> final AIMessage

ToolNode 不负责“思考”,只负责执行模型已请求的工具调用。是否调用工具由模型输出的 tool_calls 决定。

3.4.4 持久化与恢复链路

1
2
3
4
5
6
7
8
9
10
invoke(input, config={"configurable": {"thread_id": "t1"}})
-> checkpointer 读取 t1 最新 checkpoint
-> 合并本次输入
-> 执行一个或多个 super-step
-> 每个 step 边界保存 checkpoint
-> 如果 interrupt,保存暂停状态并返回 __interrupt__
-> invoke(Command(resume=value), same config)
-> 从 checkpoint 恢复
-> 重新执行 interrupt 所在节点
-> interrupt(...) 返回 resume value

人机交互的关键是“同一个 thread_id + checkpointer + Command(resume=…)”。

3.4.5 异常处理流程

常见异常来源:

  • 节点返回值不是合法 update。
  • 多个并行节点写同一 key,但该 key 没有 reducer。
  • 条件边返回未知路由。
  • 循环超过 recursion_limit
  • checkpointer 缺少 thread_id
  • async 节点超时或外部 API 失败。

处理策略:

  • 对可恢复外部错误使用 retry_policy
  • 对慢请求使用 timeout,优先异步节点。
  • 对业务失败返回结构化错误 state,而不是直接抛异常。
  • 对人工审批、危险工具调用使用 interrupt
  • 用 LangSmith 或 stream_mode="updates" 观察节点级状态更新。

4. 高频面试题与参考答案

4.1 基础概念

1. LangGraph 解决了什么问题?

LangGraph 用于构建长生命周期、可持久化、可中断、可循环和可分支的智能体工作流。它比普通 chain 更适合复杂控制流,比如多轮工具调用、人工审批、多智能体协作、失败恢复和状态回放。

2. LangGraph 和 LangChain 是什么关系?

LangChain 提供模型、工具、agent 高层抽象和生态集成;LangGraph 是底层编排运行时,负责状态图、执行调度、checkpoint、人机交互等。LangChain v1 的高层 agent 能力建立在 LangGraph 之上,但复杂编排通常直接使用 LangGraph。

3. StateGraph 的 state 是什么?

state 是图执行期间的共享数据结构,定义节点能读写哪些字段。节点读取完整或部分 state,返回 partial update,由 LangGraph 合并回全局 state。

4. START 和 END 是真实节点吗?

它们是特殊标记。START 表示入口触发点,END 表示终止位置。用户不需要为它们实现函数。

5. 节点函数应该返回什么?

通常返回 dict,表示对 state 的局部更新;也可以返回 Command,用于同时更新 state 和控制跳转;map-reduce 条件边可返回 Send 列表。

6. 为什么 LangGraph 要显式定义边?

边让执行路径可读、可视化、可校验,也让运行时能确定节点触发关系。复杂 agent 的问题通常不是“能不能调用模型”,而是“什么时候调用谁、失败如何恢复、状态怎么流转”,边正是描述这些规则的核心。

4.2 架构原理

7. LangGraph 的执行模型是什么?

底层采用类似 Pregel 的 super-step 模型。每一轮先计划要运行的节点,再并行执行这些节点,最后统一合并它们写出的 channel 更新。更新在本 step 内对其他节点不可见,只在下一 step 生效。

8. 为什么说 LangGraph 是状态机也是数据流图?

它像状态机,因为执行路径由节点和边决定;也像数据流图,因为节点通过 state/channel 读写数据,channel 更新触发后续节点。

9. compile() 做了什么?

compile() 会校验图结构,把节点包装为 PregelNode,把 state schema 转成 channel,把静态边和条件边挂载到执行图,并注入 checkpointer、store、cache、interrupt 配置,最终生成可执行的 CompiledStateGraph

10. CompiledStateGraph 和 StateGraph 的区别?

StateGraph 是构建器,只记录结构;CompiledStateGraph 是可执行对象,实现 Runnable 接口,支持 invokestreamainvokeastream、状态读取和 checkpoint 恢复。

11. PregelNode 是什么?

PregelNode 是底层 actor。它订阅 channel,读取状态,执行用户函数或 Runnable,然后把返回值转换成 channel writes。

12. Channel 的作用是什么?

Channel 是 state key 的运行时容器,负责保存值、接收更新、执行合并逻辑和生成 checkpoint。reducer 本质上会影响 channel 的更新策略。

4.3 状态机制

13. 默认 reducer 是什么?

默认行为是覆盖,也就是后写入的值替换旧值。适合 statusfinal_answer 这类单值字段。

14. 什么时候需要自定义 reducer?

当多个节点可能写同一个 key,或者你希望累积历史时需要 reducer。典型例子是 messageslogsartifacts、并行 worker 的结果列表。

15. reducer 有哪些最佳实践?

保持纯函数、确定性、类型稳定;不要在 reducer 中做 I/O;并行结果需要顺序时携带 index 并在汇总节点排序。

16. 节点能不能直接修改 state?

不建议,也不应该依赖原地修改。节点应返回 partial update,由 LangGraph 统一合并。原地修改会让状态变化难以追踪,也可能破坏 checkpoint 和并行语义。

17. 如何避免多个节点写同一字段冲突?

要么设计不同输出 key,要么为该 key 声明 reducer,要么让多个节点写入独立中间字段后由汇总节点合并。

18. MessagesState 适合什么场景?

适合聊天、工具调用和 agent loop。它围绕消息列表提供了常用的消息累积语义,能自然接入 ToolNodetools_condition 和聊天模型。

4.4 控制流机制

19. add_edge 和 add_conditional_edges 有什么区别?

add_edge 是静态跳转;add_conditional_edges 根据运行时 state 计算下一步。前者适合固定流程,后者适合分类、路由、循环退出和动态分支。

20. Command 和 conditional edge 如何选择?

只需要路由时用条件边;需要在同一个节点里“更新 state + 跳转”时用 Command。使用 Command(goto=...) 时不要再混用静态出边。

21. Send 解决什么问题?

Send 用于动态 fan-out。它可以在运行时根据列表长度创建多个任务,并为每个任务传入不同局部 state,适合 map-reduce、批处理和并行检索。

22. 如何实现循环?

用条件边指回前面的节点,并提供明确退出条件。运行时可设置 recursion_limit 防止无限循环。

23. recursion_limit 是什么?

它限制图执行的最大 step 数,不是节点内业务循环次数。复杂图中一次业务循环可能经过多个节点,因此要按图结构估算。

4.5 持久化与记忆

24. checkpointer 和 store 的区别?

checkpointer 是短期记忆,按 thread 保存图状态和执行历史,用于恢复、中断、回放;store 是长期记忆,跨 thread 保存业务数据,如用户偏好、画像、项目知识。

25. thread_id 为什么重要?

checkpointer 以 thread_id 作为主键读取和保存 checkpoint。没有 thread_id,就无法区分会话,也无法正确恢复 interrupt。

26. InMemorySaver 能不能用于生产?

不建议。它适合开发、测试和 demo,进程重启数据会丢失。生产应使用持久化后端,如 Postgres、MongoDB 或 Redis。

27. 如何设计长期记忆 namespace?

按隔离维度设计 tuple,例如 (tenant_id, user_id, "preferences")(org_id, project_id, "facts")。namespace 必须防止跨用户、跨租户串数据。

28. 为什么不能把所有长期记忆都放进 prompt?

会增加 token 成本、降低相关性,甚至引入过期或冲突信息。应按 query 检索、过滤、摘要,只注入当前任务需要的记忆。

4.6 人机交互

29. interrupt 的工作原理是什么?

节点调用 interrupt(payload) 时,LangGraph 暂停执行,把 payload 返回给调用方,并通过 checkpointer 保存状态。恢复时调用方传入 Command(resume=value)interrupt() 会返回这个 value。

30. interrupt 恢复后从哪里继续?

从包含 interrupt 的节点开头重新执行,而不是从 interrupt 那一行继续。因此 interrupt 之前的副作用可能重复发生。

31. interrupt 有哪些规则?

必须启用 checkpointer;恢复时使用同一 thread_id;不要在 try/except 中吞掉 interrupt;同一节点多个 interrupt 的顺序要稳定;payload 尽量 JSON 可序列化。

32. 人工审批工具调用怎么做?

在执行危险工具前插入 review 节点,展示工具名、参数和风险说明,通过 interrupt 等待人工批准。批准后继续执行,拒绝则返回安全的 state 更新或路由到终止节点。

4.7 LLM 与工具集成

33. ToolNode 做什么?

ToolNode 读取最后一条 AIMessage 的 tool_calls,找到对应工具并执行,然后把结果包装成 ToolMessage 写回 messages。

34. tools_condition 做什么?

它检查最后一条消息是否包含工具调用。有则路由到 tools 节点;没有则路由到 END,是 ReAct 风格工具循环的常用条件边。

35. 为什么模型明明输出工具名却没有执行工具?

多数情况下是模型没有产生标准 tool_calls,只是输出了普通文本。需要使用支持 tool calling 的模型,并通过 bind_tools(tools) 绑定工具 schema。

36. 工具设计有哪些注意点?

函数名、类型注解和 docstring 要清晰;参数尽量结构化;危险操作要权限控制和人工审批;工具返回值要简洁,避免把大量原始数据塞回上下文。

37. create_react_agent 在 v1 中还推荐吗?

LangGraph v1 迁移文档说明,旧的 langgraph.prebuilt.create_react_agent 已进入弃用路径,推荐使用 LangChain v1 的 create_agent;如果需要细粒度编排,则直接用 StateGraph + ToolNode + tools_condition

4.8 多智能体设计

38. 多智能体系统什么时候有必要?

当任务天然存在不同能力边界、上下文边界或权限边界时才有必要。例如研究、代码、审查、执行工具分别由不同节点或子图处理。

39. Supervisor 模式是什么?

Supervisor 是一个中心调度节点,负责观察任务状态并决定下一个 agent。优点是可控、易审计;缺点是中心节点容易成为复杂度瓶颈。

40. Network 模式是什么?

多个 agent 之间可以互相 handoff,没有唯一中心。优点是灵活;缺点是更难控制循环、权限和全局状态一致性。

41. 多智能体共享 state 有什么风险?

多个 agent 可能覆盖彼此输出、泄露不该看到的上下文,或形成循环。建议限制每个 agent 的输入输出 schema,使用汇总节点统一合并。

42. 子图有什么价值?

子图能封装复杂流程,让一个 agent 或能力单元作为父图中的节点复用。它也能隔离内部状态,只暴露必要输入输出。

43. handoff 如何实现?

可以用条件边或 Command(goto=...)。跨子图跳到父图节点时可使用 Command.PARENT,但父图共享 key 需要 reducer 处理更新。

4.9 性能优化

44. LangGraph 性能瓶颈通常在哪里?

常见瓶颈是 LLM 调用延迟、工具 I/O、过大的消息历史、长期记忆检索不精准、并行 fan-out 数量过大,以及 checkpoint 后端写入延迟。

45. 如何优化 token 成本?

控制 messages 长度,对历史做摘要;长期记忆按需检索;工具返回结构化摘要;多智能体间传递 artifacts 而不是完整对话。

46. 如何优化并行任务?

使用 Send 做 fan-out;限制并发和批大小;为外部 API 设置 timeout/retry;结果汇总 key 使用 reducer,并在 reduce 节点排序和裁剪。

47. checkpoint 会不会影响性能?

会。checkpoint 带来持久化和恢复能力,也增加序列化与存储开销。生产中应选择合适后端、控制 state 体积,并避免把大文件或不可序列化对象放入 state。

48. 什么数据不适合放进 state?

大文件、数据库连接、模型对象、临时 socket、不可序列化对象和敏感明文。它们应放在外部存储或运行时依赖中,state 只保存引用或摘要。

4.10 问题排查与生产实践

49. InvalidUpdateError 常见原因是什么?

节点返回了非 dict/非 Command 的值,或者返回 key 不在 state schema 中。修复方式是明确 state schema,并让节点只返回合法 partial update。

50. 条件边路由不到节点怎么办?

检查条件函数返回值、mapping key、目标节点名是否一致。建议用 Literal[...] 给路由函数标注返回类型,减少拼写错误。

51. 图意外多执行了一个节点,可能是什么原因?

常见原因是节点既返回 Command(goto=...),又定义了静态出边。LangGraph 会执行动态目标,也可能继续执行静态边目标。

52. 为什么恢复 interrupt 后副作用重复了?

因为恢复时节点从头执行。把外部副作用放在 interrupt 之后,或用幂等键、事务、状态标记防重复。

53. 如何观察每个节点的输出?

使用 graph.stream(..., stream_mode="updates") 查看节点级更新;复杂项目建议接入 LangSmith 观察 trace、状态变化和 token 成本。

54. 生产如何处理异常?

对外部依赖使用 retry/timeout;对业务失败写入结构化错误 state;危险操作走人工审批;保留 checkpoint 便于恢复;通过日志和 trace 定位失败节点。

55. 如何做测试?

把节点函数当普通函数做单元测试;对条件路由做参数化测试;对图做端到端 invoke 测试;对 interrupt 测试暂停和恢复;对 checkpointer 测试同 thread 累积和不同 thread 隔离。

56. 面试中如何一句话总结 LangGraph?

LangGraph 是一个面向智能体的状态图运行时,用显式 state、节点、边和 checkpoint,把 LLM 应用从简单链式调用扩展到可循环、可分支、可中断、可恢复的生产级工作流。

5. 工程实践清单

5.1 建模建议

  • 先设计 state schema,再写节点;schema 是团队协作契约。
  • 节点保持小而清晰:一个节点完成一个稳定职责。
  • 中间结果用结构化字段,不要全部塞进 messages
  • 对会被并行写入的 key 必须声明 reducer。
  • 对外部副作用使用幂等 ID,尤其是 interrupt 之前或 retry 节点中。

5.2 记忆建议

  • thread 内上下文用 checkpointer。
  • 跨 thread 用户画像、偏好、项目知识用 store。
  • namespace 包含租户和用户维度。
  • 定期清理、摘要或压缩长期记忆。
  • 敏感信息加密或脱敏,不要直接写入可被模型完整读取的 state。

5.3 LLM/工具建议

  • 工具函数加类型注解和清晰 docstring。
  • 工具返回尽量短,必要时返回引用 ID。
  • 对写操作、支付、删除、发消息等工具加入人工审批。
  • 选择真正支持 tool calling 的模型。
  • 给 agent loop 设置最大步数和失败出口。

5.4 多智能体建议

  • 先用单图和少量节点解决问题,不要过早拆多 agent。
  • 拆分依据是能力、权限、上下文边界,而不是“角色名字”。
  • supervisor 输出结构化 route 和理由,便于审计。
  • worker 输出 artifacts,最终由 synthesize 节点统一生成答案。
  • 对循环 handoff 设置最大轮数和死循环检测。

6. 参考资料

__END__