AI 故障排障 Agent:从人工诊断到多源数据自动推理的工程实践
2026/6/26 1:03:32 网站建设 项目流程

AI 故障排障 Agent:从人工诊断到多源数据自动推理的工程实践

一、排障效率的天花板:当资深工程师成为瓶颈

一次 P0 故障的排障过程暴露了传统模式的深层问题:交易服务响应超时,值班工程师花了 15 分钟确认现象,20 分钟逐个服务排查日志,10 分钟对比监控指标,最终在数据库侧找到根因——慢查询锁表。整个排障过程 45 分钟,其中 80% 的时间花在"找数据"而非"做判断"。

更深层的问题是知识断层:能快速排障的资深工程师只有 3 个,他们轮岗时排障效率直接腰斩。新人面对故障,不知道该看哪些指标、查哪些日志、按什么顺序排查,只能靠"猜"和"试"。

AI 排障 Agent 的核心目标:将排障经验从"人脑中的隐性知识"转化为"可执行的显性流程",让 Agent 自动完成数据收集、关联分析和推理判断,输出根因假设和处置建议,工程师只需做最终决策。

二、AI 排障 Agent 的架构与推理链路

graph TB subgraph 感知层 A1[告警事件输入] A2[人工触发排障] A3[定时巡检触发] end subgraph 规划层 PL[排障规划器<br/>根据故障类型选择排障流程] KB[知识库<br/>排障SOP+历史案例] end subgraph 执行层 E1[指标查询工具<br/>Prometheus API] E2[日志查询工具<br/>Elasticsearch API] E3[链路查询工具<br/>Jaeger API] E4[K8s查询工具<br/>kubectl/API Server] E5[数据库查询工具<br/>慢查询+锁状态] end subgraph 推理层 R1[数据关联分析<br/>时序+拓扑+因果] R2[假设生成<br/>基于规则+历史案例] R3[假设验证<br/>查询验证性数据] R4[置信度评估<br/>多证据交叉验证] end subgraph 输出层 O1[根因报告<br/>含推理过程] O2[处置建议<br/>含风险评级] O3[自动执行<br/>低风险操作] end A1 --> PL A2 --> PL A3 --> PL PL --> KB PL --> E1 PL --> E2 PL --> E3 PL --> E4 PL --> E5 E1 --> R1 E2 --> R1 E3 --> R1 E4 --> R1 E5 --> R1 R1 --> R2 R2 --> R3 R3 --> R4 R4 --> O1 R4 --> O2 R4 --> O3

AI 排障 Agent 采用 ReAct(Reasoning + Acting)模式:先根据故障类型规划排障步骤,然后逐步执行工具调用获取数据,根据数据推理生成根因假设,再调用工具验证假设,循环迭代直到置信度达标。这与传统规则引擎的区别在于:规则引擎是"if-then"的静态匹配,Agent 是"观察-推理-行动"的动态循环。

三、AI 排障 Agent 的生产级代码实现

3.1 排障 Agent 核心框架

#!/usr/bin/env python3 """AI 排障 Agent:基于 ReAct 模式的自动故障诊断""" import json import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass, field from enum import Enum import logging logger = logging.getLogger(__name__) class DiagnosisStatus(Enum): """诊断状态""" PLANNING = "planning" # 规划排障步骤 EXECUTING = "executing" # 执行工具调用 REASONING = "reasoning" # 推理分析 VERIFYING = "verifying" # 验证假设 COMPLETED = "completed" # 诊断完成 FAILED = "failed" # 诊断失败 @dataclass class Hypothesis: """根因假设""" id: str description: str # 假设描述 confidence: float # 置信度 0.0-1.0 evidence: List[str] # 支持证据 verification_queries: List[Dict] # 验证该假设需要的数据查询 verified: bool = False verification_result: Optional[str] = None @dataclass class DiagnosisStep: """诊断步骤记录""" step_id: int action: str # 执行的动作 tool: str # 使用的工具 query: Dict # 查询参数 result: Any # 查询结果 observation: str # 观察结论 timestamp: datetime = field(default_factory=datetime.utcnow) @dataclass class DiagnosisReport: """诊断报告""" incident_id: str start_time: datetime end_time: Optional[datetime] status: DiagnosisStatus steps: List[DiagnosisStep] hypotheses: List[Hypothesis] root_cause: Optional[Hypothesis] remediation: List[Dict] # 处置建议 summary: str class DiagnosisAgent: """排障诊断 Agent""" def __init__(self, tools: Dict[str, Any], knowledge_base: Any, max_iterations: int = 10, confidence_threshold: float = 0.8): """ Args: tools: 可用工具集合 {tool_name: tool_instance} knowledge_base: 排障知识库 max_iterations: 最大推理迭代次数 confidence_threshold: 根因确认的置信度阈值 """ self.tools = tools self.kb = knowledge_base self.max_iterations = max_iterations self.confidence_threshold = confidence_threshold def diagnose(self, incident_id: str, alert_info: Dict) -> DiagnosisReport: """ 执行故障诊断 Args: incident_id: 故障ID alert_info: 告警信息 {alert_name, service, severity, labels} """ report = DiagnosisReport( incident_id=incident_id, start_time=datetime.utcnow(), end_time=None, status=DiagnosisStatus.PLANNING, steps=[], hypotheses=[], root_cause=None, remediation=[], summary="" ) try: # 阶段1: 规划排障步骤 plan = self._plan_diagnosis(alert_info) # 阶段2: 执行排障循环(ReAct模式) for iteration in range(self.max_iterations): report.status = DiagnosisStatus.EXECUTING # 选择下一步动作 next_action = self._select_next_action( plan, report.steps, report.hypotheses ) if not next_action: break # 执行工具调用 step = self._execute_action( iteration + 1, next_action, alert_info ) report.steps.append(step) # 推理分析 report.status = DiagnosisStatus.REASONING new_hypotheses = self._reason( step, report.hypotheses, alert_info ) report.hypotheses.extend(new_hypotheses) # 验证假设 report.status = DiagnosisStatus.VERIFYING self._verify_hypotheses(report.hypotheses) # 检查是否找到高置信度根因 best = self._get_best_hypothesis(report.hypotheses) if best and best.confidence >= self.confidence_threshold: report.root_cause = best break # 生成诊断报告 report.status = DiagnosisStatus.COMPLETED report.end_time = datetime.utcnow() report.remediation = self._generate_remediation( report.root_cause ) report.summary = self._generate_summary(report) except Exception as e: logger.error("诊断过程异常: %s", e, exc_info=True) report.status = DiagnosisStatus.FAILED report.end_time = datetime.utcnow() report.summary = f"诊断失败: {str(e)}" return report def _plan_diagnosis(self, alert_info: Dict) -> List[Dict]: """根据告警类型规划排障步骤""" alert_name = alert_info.get("alert_name", "") service = alert_info.get("service", "") # 从知识库获取排障SOP sop = self.kb.get_sop(alert_name) if sop: return sop.steps # 通用排障流程:从外到内逐层排查 return [ {"tool": "prometheus", "action": "check_service_metrics", "params": {"service": service, "metrics": [ "error_rate", "latency_p99", "qps" ]}}, {"tool": "k8s", "action": "check_pod_status", "params": {"service": service}}, {"tool": "elasticsearch", "action": "search_error_logs", "params": {"service": service, "level": "ERROR", "window": "30m"}}, {"tool": "jaeger", "action": "check_trace_errors", "params": {"service": service, "window": "30m"}}, {"tool": "prometheus", "action": "check_infra_metrics", "params": {"service": service, "metrics": [ "cpu", "memory", "disk", "network" ]}}, {"tool": "database", "action": "check_slow_queries", "params": {"service": service, "window": "30m"}}, ] def _select_next_action(self, plan: List[Dict], completed_steps: List[DiagnosisStep], hypotheses: List[Hypothesis]) -> Optional[Dict]: """选择下一步排障动作""" completed_actions = { (s.tool, s.action) for s in completed_steps } # 优先执行验证假设的查询 for hypothesis in hypotheses: if not hypothesis.verified and hypothesis.verification_queries: query = hypothesis.verification_queries[0] action_key = (query.get("tool", ""), query.get("action", "")) if action_key not in completed_actions: return query # 否则按计划顺序执行 for step in plan: action_key = (step.get("tool", ""), step.get("action", "")) if action_key not in completed_actions: return step return None def _execute_action(self, step_id: int, action: Dict, alert_info: Dict) -> DiagnosisStep: """执行工具调用""" tool_name = action.get("tool", "") action_name = action.get("action", "") params = action.get("params", {}) # 合并告警信息到查询参数 params.update({ "namespace": alert_info.get("namespace", "default"), "cluster": alert_info.get("cluster", "") }) tool = self.tools.get(tool_name) if not tool: return DiagnosisStep( step_id=step_id, action=action_name, tool=tool_name, query=params, result=None, observation=f"工具 {tool_name} 不可用" ) # 执行工具调用,捕获异常 try: result = tool.execute(action_name, params) observation = self._interpret_result( tool_name, action_name, result ) except Exception as e: result = None observation = f"工具调用失败: {str(e)}" return DiagnosisStep( step_id=step_id, action=action_name, tool=tool_name, query=params, result=result, observation=observation ) def _reason(self, step: DiagnosisStep, existing: List[Hypothesis], alert_info: Dict) -> List[Hypothesis]: """根据新数据推理生成根因假设""" new_hypotheses = [] service = alert_info.get("service", "") # 基于工具调用结果生成假设 if step.tool == "prometheus" and step.action == "check_service_metrics": result = step.result or {} error_rate = result.get("error_rate", 0) latency = result.get("latency_p99", 0) if error_rate > 0.05: new_hypotheses.append(Hypothesis( id=f"hyp-{len(existing)+len(new_hypotheses)+1}", description=f"服务 {service} 错误率异常: {error_rate:.2%}", confidence=0.5, evidence=[f"错误率 {error_rate:.2%} 超过阈值5%"], verification_queries=[ {"tool": "elasticsearch", "action": "search_error_logs", "params": {"service": service, "level": "ERROR"}} ] )) if latency > 1.0: new_hypotheses.append(Hypothesis( id=f"hyp-{len(existing)+len(new_hypotheses)+2}", description=f"服务 {service} 延迟异常: P99={latency:.2f}s", confidence=0.4, evidence=[f"P99延迟 {latency:.2f}s 超过阈值1s"], verification_queries=[ {"tool": "jaeger", "action": "check_trace_errors", "params": {"service": service}}, {"tool": "database", "action": "check_slow_queries", "params": {"service": service}} ] )) elif step.tool == "database" and step.action == "check_slow_queries": result = step.result or {} slow_queries = result.get("slow_queries", []) if slow_queries: top_query = slow_queries[0] new_hypotheses.append(Hypothesis( id=f"hyp-{len(existing)+len(new_hypotheses)+1}", description=f"数据库慢查询: {top_query.get('query', '')[:100]}", confidence=0.7, evidence=[ f"慢查询耗时 {top_query.get('duration_ms', 0)}ms", f"影响行数 {top_query.get('rows_examined', 0)}" ], verification_queries=[ {"tool": "database", "action": "check_lock_status", "params": {"service": service}} ] )) # 更新已有假设的置信度 for hyp in existing: if step.observation and any( e in step.observation for e in hyp.evidence ): hyp.confidence = min(hyp.confidence + 0.1, 1.0) return new_hypotheses def _verify_hypotheses(self, hypotheses: List[Hypothesis]): """验证假设:检查验证查询的结果是否支持假设""" for hyp in hypotheses: if hyp.verified: continue # 如果验证查询已执行且有结果,更新置信度 if hyp.verification_result: hyp.verified = True # 验证通过增加置信度,验证不通过降低置信度 if "confirmed" in hyp.verification_result: hyp.confidence = min(hyp.confidence + 0.2, 1.0) else: hyp.confidence = max(hyp.confidence - 0.3, 0.0) def _get_best_hypothesis(self, hypotheses: List[Hypothesis]) -> Optional[Hypothesis]: """获取最高置信度的假设""" if not hypotheses: return None return max(hypotheses, key=lambda h: h.confidence) def _generate_remediation(self, root_cause: Optional[Hypothesis]) -> List[Dict]: """生成处置建议""" if not root_cause: return [{"action": "manual_investigation", "description": "未找到高置信度根因,需人工排查", "risk": "high"}] # 从知识库获取处置方案 remediation = self.kb.get_remediation(root_cause.description) if remediation: return remediation # 通用处置建议 return [ {"action": "investigate", "description": f"根因: {root_cause.description}", "risk": "medium"}, {"action": "escalate", "description": "建议升级到相关团队处理", "risk": "low"} ] def _generate_summary(self, report: DiagnosisReport) -> str: """生成诊断摘要""" duration = (report.end_time - report.start_time).total_seconds() steps_count = len(report.steps) summary = ( f"故障诊断完成,耗时 {duration:.1f}s," f"执行 {steps_count} 步排查。" ) if report.root_cause: summary += ( f"根因: {report.root_cause.description}," f"置信度 {report.root_cause.confidence:.0%}。" ) else: summary += "未找到高置信度根因,需人工介入。" return summary def _interpret_result(self, tool_name: str, action_name: str, result: Any) -> str: """解释工具调用结果""" if result is None: return "查询无结果" if isinstance(result, dict): # 提取关键信息 keys = list(result.keys())[:5] return f"查询返回 {len(result)} 个字段: {', '.join(keys)}" return str(result)[:200]

3.2 工具集实现示例

#!/usr/bin/env python3 """排障工具集:封装各数据源的查询接口""" import requests from typing import Any, Dict, Optional import logging logger = logging.getLogger(__name__) class PrometheusTool: """Prometheus 指标查询工具""" def __init__(self, base_url: str = "http://prometheus:9090"): self.base_url = base_url def execute(self, action: str, params: Dict) -> Dict: """执行 Prometheus 查询""" if action == "check_service_metrics": return self._check_service_metrics(params) elif action == "check_infra_metrics": return self._check_infra_metrics(params) else: return {"error": f"未知动作: {action}"} def _check_service_metrics(self, params: Dict) -> Dict: """查询服务级指标""" service = params.get("service", "") result = {} # 错误率 error_rate_query = ( f'sum(rate(http_requests_total{{service="{service}",' f'status=~"5.."}}[5m]))' f' / sum(rate(http_requests_total{{service="{service}"}}[5m]))' ) result["error_rate"] = self._query_scalar(error_rate_query) # P99延迟 latency_query = ( f'histogram_quantile(0.99,' f' sum(rate(http_request_duration_seconds_bucket' f'{{service="{service}"}}[5m])) by (le))' ) result["latency_p99"] = self._query_scalar(latency_query) # QPS qps_query = ( f'sum(rate(http_requests_total{{service="{service}"}}[5m]))' ) result["qps"] = self._query_scalar(qps_query) return result def _check_infra_metrics(self, params: Dict) -> Dict: """查询基础设施指标""" service = params.get("service", "") result = {} cpu_query = ( f'sum(rate(container_cpu_usage_seconds_total' f'{{pod=~"{service}-.*"}}[5m]))' ) result["cpu"] = self._query_scalar(cpu_query) memory_query = ( f'sum(container_memory_working_set_bytes' f'{{pod=~"{service}-.*"}})' ) result["memory"] = self._query_scalar(memory_query) return result def _query_scalar(self, query: str) -> float: """执行 PromQL 查询并返回标量值""" try: resp = requests.get( f"{self.base_url}/api/v1/query", params={"query": query}, timeout=10 ) data = resp.json() if data["status"] == "success" and data["data"]["result"]: return float(data["data"]["result"][0]["value"][1]) except Exception as e: logger.warning("Prometheus查询失败: %s", e) return 0.0 class ElasticsearchTool: """Elasticsearch 日志查询工具""" def __init__(self, base_url: str = "http://elasticsearch:9200"): self.base_url = base_url def execute(self, action: str, params: Dict) -> Dict: if action == "search_error_logs": return self._search_error_logs(params) return {"error": f"未知动作: {action}"} def _search_error_logs(self, params: Dict) -> Dict: """搜索错误日志""" service = params.get("service", "") level = params.get("level", "ERROR") window = params.get("window", "30m") query = { "query": { "bool": { "must": [ {"term": {"service": service}}, {"term": {"level": level}}, {"range": {"@timestamp": {"gte": f"now-{window}"}}} ] } }, "size": 20, "sort": [{"@timestamp": {"order": "desc"}}] } try: resp = requests.post( f"{self.base_url}/app-logs-*/_search", json=query, timeout=15 ) data = resp.json() hits = data.get("hits", {}).get("hits", []) return { "total": data.get("hits", {}).get("total", {}).get("value", 0), "logs": [ hit["_source"] for hit in hits[:10] ] } except Exception as e: logger.warning("ES查询失败: %s", e) return {"total": 0, "logs": []}

四、AI 排障 Agent 的局限性与工程妥协

4.1 推理链路的可靠性

Agent 的推理质量取决于每一步工具调用的结果。如果 Prometheus 查询超时、ES 返回不完整数据,推理链路就会断裂或产生错误假设。解决方案:每个工具调用设置超时和重试,对异常结果做降级处理(如查询失败时跳过该步骤,继续其他推理),同时限制最大迭代次数防止死循环。

4.2 知识库的维护成本

排障 SOP 和历史案例需要持续维护。新服务上线时没有历史案例,Agent 只能走通用流程,效率较低。建议:每次人工排障后,将排障过程和结论自动入库,形成案例积累。同时定期由资深工程师审核知识库,淘汰过时的 SOP。

4.3 自动执行的风险

Agent 输出处置建议后,是否允许自动执行?低风险操作(如扩容、重启非核心 Pod)可以自动执行,但高风险操作(如数据库切换、配置变更)必须人工确认。建议设置操作风险评级,高风险操作即使 Agent 有高置信度,也需要人工审批后才能执行。

4.4 禁用场景

以下场景不适合 AI 排障 Agent:第一,安全事件排查(如入侵检测),Agent 的工具调用可能破坏取证现场;第二,涉及敏感数据的故障(如用户隐私泄露),Agent 的日志记录可能违反合规要求;第三,基础设施级故障(如机房断电),Agent 本身可能无法运行。

五、总结

AI 排障 Agent 将故障诊断从"人工逐项排查"升级为"自动推理循环"。通过 ReAct 模式,Agent 自动规划排障步骤、调用工具获取数据、推理生成根因假设、验证假设直到找到高置信度根因。但推理链路的可靠性依赖工具调用的稳定性,知识库需要持续维护,自动执行必须限制在低风险操作。务实的落地路径:先从高频故障类型(如服务超时、Pod 重启)入手,积累排障案例,逐步扩展覆盖范围。让排障从"靠经验"变成"靠流程",让开发少背锅。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询