一、什么是人工参与循环?
1.1 通俗解释
想象你在用自动炒菜机:
全自动模式: 机器: 加盐 50 克 结果: 太咸了!但已经没法改了 人工参与模式: 机器: 准备加盐 50 克 机器: 请确认是否继续? 人类: 太多了,改成 10 克 机器: 好的,加盐 10 克 结果: 刚好!Agent 也是一样:
全自动模式: Agent: 调用 API 删除所有文件 结果: 误删了重要文件! 人工参与模式: Agent: 准备调用 API 删除文件 Agent: 请确认是否继续? 人类: 等等,先看看要删哪些 Agent: 显示文件列表 人类: 只删这些临时文件 结果: 安全删除1.2 为什么需要人工参与?
| 场景 | 问题 | 人工参与的好处 |
|---|---|---|
| 敏感操作 | 误删数据、错误转账 | 执行前确认 |
| LLM 不确定 | 输出可能有误 | 人工审核修正 |
| 需要上下文 | 信息不足 | 人工提供补充信息 |
| 合规要求 | 必须人工审批 | 满足法规要求 |
1.3 核心概念
┌─────────────────────────────────────────────────────────────┐ │ 人工参与循环流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Agent 执行到关键节点 │ │ │ │ │ ▼ │ │ 2. 调用 interrupt() 暂停 │ │ │ │ │ ▼ │ │ 3. 等待人工输入 │ │ │ │ │ ▼ │ │ 4. 使用 Command(resume=...) 恢复 │ │ │ │ │ ▼ │ │ 5. 继续执行 │ │ │ └─────────────────────────────────────────────────────────────┘二、核心 API
2.1 interrupt() 函数
作用:暂停执行,等待人工输入
from langgraph.types import interrupt def my_node(state): # 暂停执行,显示信息给人工 user_input = interrupt({ "question": "请确认是否继续?", "data": state["some_data"] }) # user_input 是人工提供的输入 return {"result": user_input}2.2 Command 对象
作用:恢复执行,提供人工输入
from langgraph.types import Command # 恢复执行,提供输入 graph.invoke(Command(resume="用户输入"), config) # 恢复执行,并跳转到特定节点 graph.invoke(Command(goto="another_node", resume="输入"), config)2.3 基本流程
from langgraph.types import interrupt, Command from langgraph.checkpoint.memory import InMemorySaver # 1. 定义节点(包含 interrupt) def human_node(state): value = interrupt({"text": state["some_text"]}) return {"some_text": value} # 2. 编译图(必须使用检查点器) checkpointer = InMemorySaver() graph = workflow.compile(checkpointer=checkpointer) # 3. 执行直到 interrupt config = {"configurable": {"thread_id": "1"}} result = graph.invoke({"some_text": "原始文本"}, config) # 4. 查看中断信息 print(result["__interrupt__"]) # 5. 恢复执行 final = graph.invoke(Command(resume="修改后的文本"), config)三、使用要求
3.1 必须满足的条件
| 条件 | 说明 |
|---|---|
| 检查点器 | 必须使用 checkpointer 保存状态 |
| 线程 ID | 必须指定 thread_id |
| interrupt 位置 | 在需要暂停的节点调用 interrupt() |
| Command 恢复 | 使用 Command(resume=...) 恢复 |
3.2 完整示例
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command import uuid # 定义状态 class State(TypedDict): some_text: str # 定义节点 def human_node(state: State): # 暂停,等待人工输入 value = interrupt({ "text_to_revise": state["some_text"] }) return {"some_text": value} # 构建图 graph_builder = StateGraph(State) graph_builder.add_node("human_node", human_node) graph_builder.add_edge(START, "human_node") # 编译(必须使用检查点器) checkpointer = InMemorySaver() graph = graph_builder.compile(checkpointer=checkpointer) # 执行 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 第一次调用:执行到 interrupt 就暂停 result = graph.invoke({"some_text": "原始文本"}, config) print("中断信息:", result["__interrupt__"]) # 第二次调用:提供人工输入,恢复执行 final = graph.invoke(Command(resume="修改后的文本"), config) print("最终结果:", final) # {'some_text': '修改后的文本'}四、设计模式
4.1 三种常见模式
┌─────────────────────────────────────────────────────────────┐ │ 设计模式 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 批准或拒绝(Approve/Reject) │ │ - 执行前确认 │ │ - 根据人工决定路由 │ │ │ │ 2. 审查和编辑(Review & Edit) │ │ - 显示内容供人工修改 │ │ - 使用修改后的内容继续 │ │ │ │ 3. 获取输入(Get Input) │ │ - 请求人工提供信息 │ │ - 用于补充上下文 │ │ │ └─────────────────────────────────────────────────────────────┘4.2 模式一:批准或拒绝
场景:执行敏感操作前需要确认
from typing import Literal from langgraph.types import interrupt, Command def human_approval(state: State) -> Command[Literal["approved", "rejected"]]: # 暂停,等待人工批准或拒绝 decision = interrupt({ "question": "是否批准此操作?", "operation": state["pending_operation"] }) # 根据人工决定跳转到不同节点 if decision == "approve": return Command(goto="approved", update={"status": "approved"}) else: return Command(goto="rejected", update={"status": "rejected"})完整示例:
from typing import Literal, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command import uuid class State(TypedDict): operation: str status: str def prepare_operation(state: State): return {"operation": "删除所有临时文件"} def human_approval(state: State) -> Command[Literal["approved", "rejected"]]: decision = interrupt({ "question": "是否批准此操作?", "operation": state["operation"] }) if decision == "approve": return Command(goto="approved", update={"status": "已批准"}) else: return Command(goto="rejected", update={"status": "已拒绝"}) def approved_node(state: State): print(f"执行操作: {state['operation']}") return state def rejected_node(state: State): print("操作已取消") return state # 构建图 builder = StateGraph(State) builder.add_node("prepare", prepare_operation) builder.add_node("approval", human_approval) builder.add_node("approved", approved_node) builder.add_node("rejected", rejected_node) builder.add_edge(START, "prepare") builder.add_edge("prepare", "approval") builder.add_edge("approved", END) builder.add_edge("rejected", END) checkpointer = InMemorySaver() graph = builder.compile(checkpointer=checkpointer) # 执行 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 第一次:执行到 interrupt result = graph.invoke({}, config) print("等待批准:", result["__interrupt__"]) # 第二次:批准操作 final = graph.invoke(Command(resume="approve"), config) print("最终状态:", final["status"]) # "已批准"流程图:
准备操作 ──▶ 人工审批 ──┬──▶ 批准 ──▶ 执行操作 │ └──▶ 拒绝 ──▶ 取消操作4.3 模式二:审查和编辑
场景:LLM 生成内容后需要人工修改
def human_edit(state: State): # 暂停,显示内容供人工编辑 result = interrupt({ "task": "请审查并编辑以下内容", "content": state["llm_output"] }) # 使用编辑后的内容 return {"content": result["edited_content"]}完整示例:
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command import uuid class State(TypedDict): summary: str def generate_summary(state: State): # 模拟 LLM 生成摘要 return {"summary": "猫坐在垫子上看着星星。"} def human_edit(state: State): result = interrupt({ "task": "请审查并编辑摘要", "generated_summary": state["summary"] }) return {"summary": result["edited_summary"]} def use_summary(state: State): print(f"使用编辑后的摘要: {state['summary']}") return state # 构建图 builder = StateGraph(State) builder.add_node("generate", generate_summary) builder.add_node("edit", human_edit) builder.add_node("use", use_summary) builder.add_edge(START, "generate") builder.add_edge("generate", "edit") builder.add_edge("edit", "use") builder.add_edge("use", END) checkpointer = InMemorySaver() graph = builder.compile(checkpointer=checkpointer) # 执行 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 第一次:生成并暂停 result = graph.invoke({}, config) print("等待编辑:", result["__interrupt__"]) # 第二次:提供编辑后的内容 final = graph.invoke( Command(resume={"edited_summary": "猫躺在地毯上,平静地凝视夜空。"}), config ) print("最终摘要:", final["summary"])流程图:
LLM 生成 ──▶ 人工编辑 ──▶ 使用编辑结果4.4 模式三:审查工具调用
场景:LLM 调用工具前需要人工确认
from typing import Literal from langgraph.types import interrupt, Command def review_tool_call(state) -> Command[Literal["run_tool", "call_llm"]]: tool_call = state["pending_tool_call"] # 暂停,显示工具调用信息 review = interrupt({ "question": "是否执行此工具调用?", "tool_name": tool_call["name"], "tool_args": tool_call["args"] }) action, data = review if action == "continue": # 批准执行 return Command(goto="run_tool") elif action == "update": # 修改参数后执行 return Command(goto="run_tool", update={"tool_args": data}) elif action == "feedback": # 拒绝,给 LLM 反馈 return Command(goto="call_llm", update={"feedback": data})4.5 模式四:验证人工输入
场景:需要验证人工输入的有效性
def get_valid_age(state: State): prompt = "请输入您的年龄(必须是正整数)" while True: # 暂停,等待输入 user_input = interrupt(prompt) # 验证输入 try: age = int(user_input) if age < 0: raise ValueError("年龄不能为负数") # 输入有效,退出循环 break except (ValueError, TypeError): # 输入无效,更新提示,继续循环 prompt = f"'{user_input}' 无效,请输入正整数" return {"age": age}完整示例:
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command import uuid class State(TypedDict): age: int def get_valid_age(state: State): prompt = "请输入您的年龄" while True: user_input = interrupt(prompt) try: age = int(user_input) if age < 0: raise ValueError("年龄不能为负数") break except (ValueError, TypeError): prompt = f"'{user_input}' 无效,请输入正整数" return {"age": age} def report_age(state: State): print(f"用户年龄: {state['age']} 岁") return state # 构建图 builder = StateGraph(State) builder.add_node("get_age", get_valid_age) builder.add_node("report", report_age) builder.add_edge(START, "get_age") builder.add_edge("get_age", "report") builder.add_edge("report", END) checkpointer = InMemorySaver() graph = builder.compile(checkpointer=checkpointer) # 执行 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 第一次:等待输入 result = graph.invoke({}, config) print(result["__interrupt__"]) # "请输入您的年龄" # 输入无效值 result = graph.invoke(Command(resume="abc"), config) print(result["__interrupt__"]) # "'abc' 无效,请输入正整数" # 再次输入无效值 result = graph.invoke(Command(resume="-10"), config) print(result["__interrupt__"]) # "'-10' 无效,请输入正整数" # 输入有效值 final = graph.invoke(Command(resume="25"), config) print("最终年龄:", final["age"]) # 25五、恢复执行
5.1 如何恢复?
使用Command(resume=...)恢复执行:
from langgraph.types import Command # 简单恢复(提供输入值) graph.invoke(Command(resume="用户输入"), config) # 恢复并跳转 graph.invoke(Command(goto="another_node", resume="输入"), config) # 恢复并更新状态 graph.invoke( Command(goto="next_node", update={"key": "value"}, resume="输入"), config )5.2 恢复原理
中断时: ┌────────────────────────────────────────┐ │ 节点执行到 interrupt() │ │ ↓ │ │ 保存当前状态 │ │ ↓ │ │ 返回 __interrupt__ 信息 │ │ ↓ │ │ 暂停等待 │ └────────────────────────────────────────┘ 恢复时: ┌────────────────────────────────────────┐ │ 接收 Command(resume=...) │ │ ↓ │ │ 重新执行该节点 │ │ ↓ │ │ interrupt() 返回 resume 值 │ │ ↓ │ │ 继续执行 │ └────────────────────────────────────────┘5.3 重要提示
注意: 恢复时,会重新执行整个节点,而不是从中断点继续! 例如: def my_node(state): print("步骤 1") value = interrupt("等待输入") # 中断点 print("步骤 2") return {"value": value} 执行流程: 1. 第一次调用:打印"步骤 1",然后中断 2. 恢复调用:重新打印"步骤 1",然后继续打印"步骤 2" 建议: 将 interrupt 放在节点的开头,避免重复执行副作用六、常见陷阱
6.1 副作用问题
问题:interrupt 之前的代码会重复执行
# 错误示例 def bad_node(state): # 这个操作会执行两次! send_email("开始处理") # 副作用 value = interrupt("等待输入") return {"value": value} # 正确示例 def good_node(state): # 先中断,再执行副作用 value = interrupt("等待输入") # 恢复后才执行 send_email(f"处理完成: {value}") return {"value": value}6.2 子图问题
问题:子图中的 interrupt 需要特殊处理
# 子图中的 interrupt 需要在父图中处理 # 详见官方文档6.3 多个 interrupt
问题:一个节点内有多个 interrupt
def multi_interrupt_node(state): # 第一个 interrupt value1 = interrupt("第一个输入") # 第二个 interrupt value2 = interrupt("第二个输入") return {"value1": value1, "value2": value2} # 恢复时需要按顺序提供输入 # 第一次恢复:提供 value1 # 第二次恢复:提供 value2七、完整示例
7.1 敏感操作审批
from typing import TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command from langchain_openai import ChatOpenAI import uuid class State(TypedDict): messages: list pending_action: str approved: bool def plan_action(state: State): model = ChatOpenAI(model="gpt-4") # LLM 决定要执行的操作 action = "删除用户数据" return {"pending_action": action} def human_approval(state: State) -> Command[Literal["execute", "cancel"]]: decision = interrupt({ "question": "是否批准此操作?", "action": state["pending_action"], "options": ["approve", "reject"] }) if decision == "approve": return Command(goto="execute", update={"approved": True}) else: return Command(goto="cancel", update={"approved": False}) def execute_action(state: State): print(f"执行操作: {state['pending_action']}") return state def cancel_action(state: State): print("操作已取消") return state # 构建图 builder = StateGraph(State) builder.add_node("plan", plan_action) builder.add_node("approval", human_approval) builder.add_node("execute", execute_action) builder.add_node("cancel", cancel_action) builder.add_edge(START, "plan") builder.add_edge("plan", "approval") builder.add_edge("execute", END) builder.add_edge("cancel", END) checkpointer = InMemorySaver() graph = builder.compile(checkpointer=checkpointer) # 使用 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 执行到审批点 result = graph.invoke({}, config) print("等待审批:", result["__interrupt__"]) # 批准操作 final = graph.invoke(Command(resume="approve"), config)7.2 LLM 输出审查
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command from langchain_openai import ChatOpenAI import uuid class State(TypedDict): user_input: str llm_output: str final_output: str def generate(state: State): model = ChatOpenAI(model="gpt-4") response = model.invoke(state["user_input"]) return {"llm_output": response.content} def review(state: State): result = interrupt({ "task": "请审查 LLM 输出", "output": state["llm_output"], "options": ["approve", "edit", "regenerate"] }) if result["action"] == "approve": return {"final_output": state["llm_output"]} elif result["action"] == "edit": return {"final_output": result["edited_output"]} else: # 重新生成 return Command(goto="generate") # 构建图 builder = StateGraph(State) builder.add_node("generate", generate) builder.add_node("review", review) builder.add_edge(START, "generate") builder.add_edge("generate", "review") builder.add_edge("review", END) checkpointer = InMemorySaver() graph = builder.compile(checkpointer=checkpointer) # 使用 config = {"configurable": {"thread_id": str(uuid.uuid4())}} # 生成并等待审查 result = graph.invoke({"user_input": "写一首诗"}, config) print("等待审查:", result["__interrupt__"]) # 批准输出 final = graph.invoke( Command(resume={"action": "approve"}), config )八、常见问题
Q1: interrupt 和普通输入有什么区别?
| interrupt | 普通输入 |
|---|---|
| 暂停执行 | 不暂停 |
| 需要检查点器 | 不需要 |
| 可以无限期等待 | 必须立即响应 |
| 适合人工审批 | 适合自动流程 |
Q2: 如何获取中断信息?
# 方法 1:从 invoke 结果获取 result = graph.invoke(input, config) print(result["__interrupt__"]) # 方法 2:从 get_state 获取 state = graph.get_state(config) print(state.next) # 下一个要执行的节点Q3: 如何处理超时?
import asyncio async def wait_for_approval(config, timeout=300): # 等待人工审批,最多 5 分钟 try: await asyncio.wait_for( check_approval(config), timeout=timeout ) except asyncio.TimeoutError: # 超时,自动拒绝 graph.invoke(Command(resume="reject"), config)Q4: 如何在 Web 应用中使用?
# 后端 API @app.post("/approve") async def approve(request): thread_id = request.thread_id decision = request.decision config = {"configurable": {"thread_id": thread_id}} result = graph.invoke(Command(resume=decision), config) return {"status": "completed", "result": result}九、API 速查表
9.1 核心 API
| API | 说明 |
|---|---|
interrupt(value) | 暂停执行,等待人工输入 |
Command(resume=...) | 恢复执行,提供输入 |
Command(goto=...) | 恢复执行,跳转到指定节点 |
Command(update=...) | 恢复执行,更新状态 |
9.2 使用步骤
| 步骤 | 说明 |
|---|---|
| 1 | 编译图时添加 checkpointer |
| 2 | 在节点中调用 interrupt() |
| 3 | 使用 thread_id 执行图 |
| 4 | 从结果中获取interrupt信息 |
| 5 | 使用 Command(resume=...) 恢复 |
十、延伸阅读
- LangGraph 官方文档 - 人工参与循环
- LangGraph 教程 - 审查工具调用
- LangGraph 教程 - 持久化
总结
人工参与循环的核心要点:
- interrupt():暂停执行,等待人工输入
- Command:恢复执行,提供人工输入
- 检查点器:必须使用,保存状态
- 线程 ID:必须指定,标识会话
四种设计模式:
- 批准或拒绝:敏感操作前确认
- 审查和编辑:修正 LLM 输出
- 审查工具调用:确认工具参数
- 验证输入:确保输入有效
一句话总结:人工参与循环让 Agent 在关键节点暂停等待人工确认,实现安全可控的自动化流程。