硅谷 AI 圈悄悄流行的一种新职业,叫 FDE,其关键是寻找并解决现场数据之外的问题
2026/5/14 18:28:03
> github链接:https://github.com/hzqjgthy/LangGraph_TL (求star)本文档总结了 7 个脚本中关于 LangGraph 和 LangChain 流式输出的核心知识点。
所有脚本都使用了以下基础配置:
fromlangchain.chat_modelsimportinit_chat_modelfromdotenvimportload_dotenvimportos load_dotenv(override=True)# 初始化模型llm=init_chat_model(model="deepseek-chat",model_provider="deepseek",api_key=os.getenv("DEEPSEEK_API_KEY"),base_url=os.getenv("DEEPSEEK_URL"),temperature=0,)| 脚本 | 行数 | 核心功能 | 关键知识点 |
|---|---|---|---|
| test_29.py | 45 | OpenWeather API 测试 | API 调用基础、JSON 解析 |
| test_30.py | 242 | LangGraph Agent 基础 | create_react_agent、工具集成、非流式输出 |
| test_31.py | 43 | LangChain 流式输出 | astream 方法、chunk 累加 |
| test_32.py | 266 | LangGraph 同步流式输出 | values/updates 模式 |
| test_33.py | 245 | LangGraph 异步流式输出 | 异步 astream |
| test_34.py | 245 | messages 模式流式输出 | 增量 token 处理 |
| test_35.py | 254 | astream_events 事件流 | 事件驱动流式输出 |
功能:查询城市天气信息
代码要点:
defget_weather(loc):url="https://api.openweathermap.org/data/2.5/weather"params={"q":loc,# 城市名称(英文)"appid":"YOUR_API_KEY","units":"metric",# 摄氏度"lang":"zh_cn"# 简体中文}response=requests.get(url,params=params)returnjson.dumps(response.json())注意事项:
使用@tool装饰器定义工具,并使用 Pydantic 模型定义参数:
classWeatherLoc(BaseModel):location:str=Field(description="城市名称")@tool(args_schema=WeatherLoc)defget_weather(location):"""查询当前天气"""# 实现代码使用 SQLAlchemy 定义数据模型:
fromsqlalchemyimportcreate_engine,Column,Integer,String,Floatfromsqlalchemy.ormimportsessionmaker,declarative_base Base=declarative_base()classWeather(Base):__tablename__='weather_11'city_id=Column(Integer,primary_key=True)city_name=Column(String(50))temperature=Column(Float)# ... 其他字段数据库连接:
DATABASE_URI='mysql+pymysql://user:password@host:3306/database?charset=utf8mb4'engine=create_engine(DATABASE_URI)Base.metadata.create_all(engine)Session=sessionmaker(bind=engine)fromlanggraph.prebuiltimportcreate_react_agent tools=[fetch_real_time_info,get_weather,insert_weather_to_db,query_weather_from_db]graph=create_react_agent(llm,tools=tools)png_bytes=graph.get_graph(xray=True).draw_mermaid_png()withopen("graph_30.png","wb")asf:f.write(png_bytes)response=graph.invoke({"messages":["北京今天的天气怎么样?"]})print(response["messages"][-1].content)异步流式输出:
asyncdefstream_function():chunks=[]asyncforchunkinllm.astream("你好,请你详细的介绍一下你自己。"):chunks.append(chunk)print(chunk.content,end="|",flush=True)# chunk 可以累加combined=chunks[0]+chunks[1]+chunks[2]print(combined)asyncio.run(stream_function())关键点:
astream()方法进行异步流式输出+操作符进行累加asyncio.run()来执行异步函数LangGraph 提供了 5 种流式输出模式:
| 模式 | 返回内容 | 使用场景 |
|---|---|---|
| values | 每个步骤后的完整状态 | 需要完整上下文 |
| updates | 每个节点的增量更新 | 按节点处理 |
| messages | 增量 token 流 | 实时文本输出 |
| debug | 详细调试信息 | 调试程序 |
| custom | 自定义流 | 高级定制 |
特点:返回每个步骤后的完整状态
defprint_stream(stream):forsub_streaminstream:# sub_stream 是字典,包含 messages 字段message=sub_stream["messages"][-1]message.pretty_print()input_message={"messages":["你好,南京现在的天气怎么样?"]}print_stream(graph.stream(input_message,stream_mode="values"))输出结构:
{"messages":[HumanMessage(...),AIMessage(...),ToolMessage(...)]}特点:返回每个节点的增量更新
defprint_stream_updates(stream):forsub_streaminstream:# sub_stream 结构: {节点名称: {messages: [消息]}}fornode_name,node_datainsub_stream.items():print(f"---{node_name.upper()}节点 ---")if"messages"innode_data:formessageinnode_data["messages"]:message.pretty_print()print_stream_updates(graph.stream(input_message,stream_mode="updates"))输出结构:
{"agent":{"messages":[AIMessage(...)]},"tools":{"messages":[ToolMessage(...)]}}节点类型:
agent: LLM 的决策或响应tools: 工具执行结果asyncdefstream_function():asyncforchunkingraph.astream(input={"messages":["你好,成都的天气怎么样?"]},stream_mode="values"):message=chunk["messages"][-1].pretty_print()asyncio.run(stream_function())asyncdefstream_function_2():inputs={"messages":[("human","你好,乌鲁木齐的天气怎么样?")]}asyncforchunkingraph.astream(inputs,stream_mode="updates"):fornode,valuesinchunk.items():print(f"接收到的更新节点: '{node}'")message=values["messages"][0]message.pretty_print()asyncio.run(stream_function_2())同步 vs 异步:
stream): 阻塞式,适合简单脚本astream): 非阻塞,适合高并发场景特点:记录每个消息中的增量 token,实现逐字输出
asyncdefstream_function():asyncformsg,metadataingraph.astream({"messages":["你好,帮我查询一下数据库中北京的天气数据"]},stream_mode="messages"):# 只输出非 HumanMessage 的内容ifmsg.contentandnotisinstance(msg,HumanMessage):print(msg.content,end="|",flush=True)# 处理 AIMessageChunkifisinstance(msg,AIMessageChunk):iffirst:gathered=msg first=Falseelse:gathered=gathered+msg# 累加 chunk# 输出工具调用信息ifmsg.tool_call_chunks:print(gathered.tool_calls)asyncio.run(stream_function())适用场景:
特点:事件驱动的流式输出,提供更细粒度的控制
asyncdefstream_function():asyncforeventingraph.astream_events({"messages":["北京的天气怎么样"]},version="v2"):kind=event["event"]print(f"{kind}:{event['name']}----------------{event['data']}")asyncio.run(stream_function())asyncdefstream_function():asyncforeventingraph.astream_events({"messages":["北京的天气怎么样"]},version="v2"):kind=event["event"]# 过滤聊天模型流事件ifkind=="on_chat_model_stream":chunk=event["data"]["chunk"]# 输出文本内容ifchunk.content:print(chunk.content,end="",flush=True)# 输出工具调用信息elifchunk.tool_calls:fortool_callinchunk.tool_calls:iftool_call.get('name'):print(f"\n[调用工具:{tool_call['name']}]\n")asyncio.run(stream_function())事件类型:
on_chat_model_stream: 聊天模型输出流on_tool_start: 工具开始执行on_tool_end: 工具执行结束优势:
需求:查询多个城市天气并存储到数据库
response=graph.invoke({"messages":["帮我查一下北京、上海、哈尔滨三个城市的天气,并存储到数据库"]})Agent 执行流程:
get_weather工具 3 次insert_weather_to_db工具 3 次存储数据需求:从数据库读取天气数据并进行对比分析
response=graph.invoke({"messages":["帮我分析一下数据库中北京和哈尔滨城市天气的信息,做详细对比"]})Agent 执行流程:
query_weather_from_db查询北京天气query_weather_from_db查询哈尔滨天气需求:获取最新的互联网信息
response=graph.invoke({"messages":["你知道 Claude 3.5 发布的 computer use 吗?请用中文回复"]})Agent 执行流程:
fetch_real_time_info工具搜索| 场景 | 推荐模式 | 理由 |
|---|---|---|
| 简单问答 | values | 完整上下文,易于调试 |
| UI 实时显示 | messages | 逐字输出,用户体验好 |
| 调试工具调用 | updates | 清晰区分 agent 和 tools |
| 复杂事件处理 | astream_events | 精细控制每个事件 |
| 性能要求高 | 异步 (astream) | 非阻塞,高并发 |
@tool(args_schema=QueryWeatherSchema)defquery_weather_from_db(city_name:str):session=Session()try:# 执行查询weather_data=session.query(Weather).filter(Weather.city_name==city_name).first()ifweather_data:return{"city_name":weather_data.city_name,"temperature":weather_data.temperature,# ... 其他字段}else:return{"messages":[f"未找到城市 '{city_name}' 的天气信息。"]}exceptExceptionase:return{"messages":[f"查询失败,错误原因:{e}"]}finally:session.close()# 确保关闭会话关键点:
try-except-finally确保资源释放merge()实现插入或更新rollback()print(..., end="", flush=True)| 消息类型 | 用途 | 来源 |
|---|---|---|
HumanMessage | 用户输入 | 用户 |
AIMessage | 模型响应 | LLM |
ToolMessage | 工具执行结果 | 工具 |
AIMessageChunk | 模型流式输出片段 | LLM (流式) |
OpenWeather API 只支持英文城市名称。对于中国城市:
使用环境变量:
fromdotenvimportload_dotenv load_dotenv(override=True)api_key=os.getenv("OPENWEATHER_API_KEY").env文件示例:
OPENWEATHER_API_KEY=your_key_here DEEPSEEK_API_KEY=your_key_here DEEPSEEK_URL=https://api.deepseek.com确保:
flush=True参数print()的缓冲IPython.display检查:
test_29.py理解 API 调用基础test_30.py理解 Agent 的构建和工具集成test_31.py理解基础流式输出test_32.py和test_33.py对比同步/异步流式输出test_34.py理解 messages 模式的实时输出test_35.py掌握事件驱动的高级流式输出本系列脚本从基础的 API 调用到复杂的 Agent 流式输出,全面展示了 LangGraph 的核心功能:
✅工具集成:如何定义和使用工具
✅数据库操作:SQLAlchemy ORM 的最佳实践
✅流式输出:5 种不同模式的适用场景
✅异步编程:提升并发性能
✅事件驱动:精细控制 Agent 执行流程
掌握这些知识点后,可以构建功能强大的 LLM 应用,实现复杂的多步骤推理、工具调用和实时交互。