Agent监控与日志:生产环境的可观测性
2026/5/8 17:36:51 网站建设 项目流程

📊完整监控方案 | Prometheus + Grafana + ELK | 链路追踪 + 告警规则 | 生产级实践指南


📖 为什么需要监控?

生产环境的挑战

在没有监控的情况下,你会遇到:

❌ "系统变慢了,但不知道哪里慢" ❌ "用户报错了,但找不到原因" ❌ "API费用突然暴涨,不知道谁在用" ❌ "缓存命中率下降,但没有告警" ❌ "半夜系统挂了,第二天才知道"

监控的价值:

  • 快速定位问题- 从小时级降到分钟级
  • 预防故障- 提前发现异常趋势
  • 优化性能- 数据驱动的优化决策
  • 成本控制- 实时监控API费用
  • 提升用户体验- 及时发现并解决问题

🏗️ 监控架构总览

┌─────────────────────────────────────────────┐ │ Agent Application │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ Metrics │ │ Logs │ │ Traces │ │ │ └────┬─────┘ └────┬─────┘ └──────┬──────┘ │ └───────┼────────────┼──────────────┼─────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────┐ ┌──────────────┐ │ Prometheus │ │ ELK │ │ Jaeger/ │ │ (指标收集) │ │ (日志) │ │ Zipkin │ │ │ │ │ │ (链路追踪) │ └──────┬───────┘ └────┬─────┘ └──────┬───────┘ ↓ ↓ ↓ ┌─────────────────────────────────────────────┐ │ Grafana Dashboard │ │ (统一可视化和告警) │ └─────────────────────────────────────────────┘

三大支柱

  1. Metrics(指标)- 数值型数据(QPS、延迟、错误率)
  2. Logs(日志)- 事件记录(请求详情、错误堆栈)
  3. Traces(链路追踪)- 请求全流程(跨服务调用链)

📈 Metrics:关键指标监控

核心指标定义

指标类别指标名称说明告警阈值
性能request_duration_seconds请求耗时P95 > 2s
流量requests_total总请求数-
错误errors_total错误总数错误率 > 5%
缓存cache_hit_rate缓存命中率< 60%
成本api_cost_usdAPI费用日费用 > $50
资源active_connections活跃连接数> 1000

实现代码

from prometheus_client import Counter, Histogram, Gauge, start_http_server import time import functools # 定义指标 REQUEST_COUNT = Counter( 'agent_requests_total', 'Total number of requests', ['method', 'endpoint', 'status', 'tenant_id'] ) REQUEST_DURATION = Histogram( 'agent_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint', 'tenant_id'], buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] ) ERROR_COUNT = Counter( 'agent_errors_total', 'Total number of errors', ['error_type', 'tenant_id'] ) CACHE_HIT_RATE = Gauge( 'agent_cache_hit_rate', 'Cache hit rate percentage', ['cache_type'] ) API_COST = Counter( 'agent_api_cost_usd', 'API cost in USD', ['model', 'tenant_id'] ) ACTIVE_CONNECTIONS = Gauge( 'agent_active_connections', 'Number of active connections' ) def monitor_request(func): """请求监控装饰器""" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) # 记录成功请求 REQUEST_COUNT.labels( method='POST', endpoint=func.__name__, status='success', tenant_id=kwargs.get('tenant_id', 'unknown') ).inc() return result except Exception as e: # 记录错误 ERROR_COUNT.labels( error_type=type(e).__name__, tenant_id=kwargs.get('tenant_id', 'unknown') ).inc() raise finally: # 记录耗时 duration = time.time() - start_time REQUEST_DURATION.labels( method='POST', endpoint=func.__name__, tenant_id=kwargs.get('tenant_id', 'unknown') ).observe(duration) return wrapper # 使用示例 @monitor_request def query_knowledge_base(tenant_id: str, query: str): """查询知识库(自动监控)""" # 业务逻辑 result = rag_service.query(tenant_id, query) # 记录API成本 cost = calculate_api_cost(result) API_COST.labels( model='gpt-3.5-turbo', tenant_id=tenant_id ).inc(cost) return result # 启动Prometheus指标服务器 start_http_server(8000) print("✅ Prometheus metrics server started on port 8000")

📝 Logs:结构化日志

日志级别定义

import logging import json from datetime import datetime class StructuredFormatter(logging.Formatter): """结构化日志格式化器(JSON格式)""" def format(self, record): log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno, } # 添加额外字段 if hasattr(record, 'tenant_id'): log_entry['tenant_id'] = record.tenant_id if hasattr(record, 'request_id'): log_entry['request_id'] = record.request_id if record.exc_info and record.exc_info[1]: log_entry['exception'] = { 'type': type(record.exc_info[1]).__name__, 'message': str(record.exc_info[1]), 'traceback': self.formatException(record.exc_info) } return json.dumps(log_entry, ensure_ascii=False) # 配置日志 logger = logging.getLogger('agent') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(StructuredFormatter()) logger.addHandler(handler)

日志记录最佳实践

import uuid from contextvars import ContextVar # 请求ID上下文变量 request_id_var: ContextVar[str] = ContextVar('request_id', default='') def generate_request_id() -> str: """生成唯一请求ID""" return str(uuid.uuid4()) class RequestLogger: """请求日志记录器""" def __init__(self, tenant_id: str): self.tenant_id = tenant_id self.request_id = generate_request_id() request_id_var.set(self.request_id) def log_request_start(self, query: str): """记录请求开始""" logger.info( "Request started", extra={ 'tenant_id': self.tenant_id, 'request_id': self.request_id, 'query_preview': query[:100] } ) def log_cache_hit(self, cache_type: str): """记录缓存命中""" logger.info( "Cache hit", extra={ 'tenant_id': self.tenant_id, 'request_id': self.request_id, 'cache_type': cache_type } ) def log_llm_call(self, model: str, tokens: int, cost: float): """记录LLM调用""" logger.info( "LLM call completed", extra={ 'tenant_id': self.tenant_id, 'request_id': self.request_id, 'model': model, 'tokens': tokens, 'cost_usd': cost } ) def log_error(self, error: Exception, context: dict = None): """记录错误""" logger.error( f"Request failed: {str(error)}", extra={ 'tenant_id': self.tenant_id, 'request_id': self.request_id, 'context': context or {} }, exc_info=True ) def log_request_end(self, duration: float): """记录请求结束""" logger.info( "Request completed", extra={ 'tenant_id': self.tenant_id, 'request_id': self.request_id, 'duration_seconds': duration } ) # 使用示例 def handle_query(tenant_id: str, query: str): req_logger = RequestLogger(tenant_id) start_time = time.time() try: req_logger.log_request_start(query) # 检查缓存 cached = cache.get(query) if cached: req_logger.log_cache_hit('memory') return cached # 调用LLM result = call_llm(query) tokens = count_tokens(result) cost = calculate_cost(tokens) req_logger.log_llm_call('gpt-3.5-turbo', tokens, cost) duration = time.time() - start_time req_logger.log_request_end(duration) return result except Exception as e: req_logger.log_error(e, {'query': query}) raise

ELK Stack集成

# elasticsearch_handler.py from elasticsearch import Elasticsearch from logging.handlers import BufferingHandler class ElasticsearchHandler(BufferingHandler): """Elasticsearch日志处理器""" def __init__(self, es_host: str, index_prefix: str = 'agent-logs'): super().__init__(capacity=100) # 缓冲100条日志 self.es = Elasticsearch([es_host]) self.index_prefix = index_prefix def emit(self, record): """发送日志到Elasticsearch""" try: log_entry = { '@timestamp': datetime.utcnow().isoformat(), 'level': record.levelname, 'message': record.getMessage(), 'tenant_id': getattr(record, 'tenant_id', None), 'request_id': getattr(record, 'request_id', None), 'module': record.module, 'function': record.funcName, } # 按天创建索引 index_name = f"{self.index_prefix}-{datetime.utcnow().strftime('%Y.%m.%d')}" self.es.index(index=index_name, document=log_entry) except Exception as e: print(f"Failed to send log to ES: {e}") # 配置 es_handler = ElasticsearchHandler(es_host='http://localhost:9200') logger.addHandler(es_handler)

🔍 Traces:分布式链路追踪

OpenTelemetry集成

from opentelemetry import trace from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.instrumentation.requests import RequestsInstrumentor # 配置Jaeger导出器 jaeger_exporter = JaegerExporter( agent_host_name='localhost', agent_port=6831, ) # 设置TracerProvider provider = TracerProvider() processor = BatchSpanProcessor(jaeger_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # 自动instrument requests库 RequestsInstrumentor().instrument() # 获取tracer tracer = trace.get_tracer(__name__) # 使用示例 def process_user_request(tenant_id: str, query: str): with tracer.start_as_current_span("process_request") as span: # 添加属性 span.set_attribute("tenant.id", tenant_id) span.set_attribute("query.length", len(query)) # 子span:缓存查询 with tracer.start_as_current_span("check_cache") as cache_span: cached = cache.get(query) cache_span.set_attribute("cache.hit", cached is not None) if cached: return cached # 子span:LLM调用 with tracer.start_as_current_span("call_llm") as llm_span: result = call_llm(query) llm_span.set_attribute("llm.model", "gpt-3.5-turbo") return result

自定义Span装饰器

def trace_operation(operation_name: str): """链路追踪装饰器""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): with tracer.start_as_current_span(operation_name) as span: # 添加参数信息 for key, value in kwargs.items(): if isinstance(value, (str, int, float, bool)): span.set_attribute(f"param.{key}", value) try: result = func(*args, **kwargs) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.set_status(trace.StatusCode.ERROR) span.record_exception(e) raise return wrapper return decorator # 使用 @trace_operation("rag_query") def query_rag(tenant_id: str, query: str): # 自动追踪 ...

📊 Grafana Dashboard

Dashboard JSON配置

{ "dashboard": { "title": "Agent Performance Monitor", "panels": [ { "title": "请求QPS", "type": "graph", "targets": [ { "expr": "rate(agent_requests_total[5m])", "legendFormat": "{{tenant_id}}" } ] }, { "title": "P95响应时间", "type": "graph", "targets": [ { "expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))", "legendFormat": "P95" } ] }, { "title": "错误率", "type": "graph", "targets": [ { "expr": "rate(agent_errors_total[5m]) / rate(agent_requests_total[5m]) * 100", "legendFormat": "Error Rate %" } ] }, { "title": "缓存命中率", "type": "gauge", "targets": [ { "expr": "agent_cache_hit_rate", "legendFormat": "{{cache_type}}" } ] }, { "title": "API费用(今日)", "type": "stat", "targets": [ { "expr": "sum(increase(agent_api_cost_usd[24h]))", "legendFormat": "Total Cost" } ] } ] } }

关键视图

1. 概览面板

  • 实时QPS
  • 平均响应时间
  • 错误率
  • 活跃租户数

2. 租户详情

  • 每个租户的请求量
  • 每个租户的API费用
  • 每个租户的缓存命中率

3. 性能分析

  • P50/P95/P99响应时间
  • 慢请求Top 10
  • 错误类型分布

4. 成本监控

  • 每日API费用趋势
  • 各模型费用占比
  • 各租户费用排名

🚨 告警规则

Prometheus告警配置

# alert_rules.yml groups: - name: agent_alerts rules: # 高错误率告警 - alert: HighErrorRate expr: rate(agent_errors_total[5m]) / rate(agent_requests_total[5m]) > 0.05 for: 5m labels: severity: critical annotations: summary: "高错误率 detected" description: "错误率超过5%,当前值: {{ $value | humanizePercentage }}" # 慢请求告警 - alert: SlowRequests expr: histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])) > 2 for: 10m labels: severity: warning annotations: summary: "慢请求 detected" description: "P95响应时间超过2秒,当前值: {{ $value }}s" # 缓存命中率低告警 - alert: LowCacheHitRate expr: agent_cache_hit_rate < 60 for: 15m labels: severity: warning annotations: summary: "缓存命中率低" description: "缓存命中率低于60%,当前值: {{ $value }}%" # API费用异常告警 - alert: HighAPICost expr: increase(agent_api_cost_usd[1h]) > 10 for: 0m labels: severity: critical annotations: summary: "API费用异常" description: "过去1小时API费用超过$10,当前值: ${{ $value }}" # 活跃连接数过高 - alert: HighActiveConnections expr: agent_active_connections > 1000 for: 5m labels: severity: warning annotations: summary: "活跃连接数过高" description: "活跃连接数超过1000,当前值: {{ $value }}"

告警通知渠道

# notification.py import requests from typing import List class AlertNotifier: """告警通知器""" def __init__(self): self.webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL" self.email_recipients = ["admin@company.com"] def send_slack_alert(self, alert_name: str, severity: str, description: str): """发送Slack告警""" color_map = { 'critical': '#ff0000', 'warning': '#ffa500', 'info': '#00ff00' } payload = { "attachments": [ { "color": color_map.get(severity, '#000000'), "title": f"🚨 {alert_name}", "text": description, "fields": [ { "title": "Severity", "value": severity.upper(), "short": True }, { "title": "Time", "value": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "short": True } ] } ] } requests.post(self.webhook_url, json=payload) def send_email_alert(self, subject: str, body: str): """发送邮件告警""" import smtplib from email.mime.text import MIMEText msg = MIMEText(body) msg['Subject'] = f"[ALERT] {subject}" msg['From'] = "alerts@company.com" msg['To'] = ", ".join(self.email_recipients) with smtplib.SMTP('smtp.company.com') as server: server.send_message(msg) # 使用 notifier = AlertNotifier() notifier.send_slack_alert( alert_name="HighErrorRate", severity="critical", description="错误率超过5%,请立即检查!" )

💻 完整项目代码

我已经为你准备了完整的监控系统项目:

GitHub仓库:

git clone https://github.com/Lee985-cmd/AI-30-Day-Challenge.git cd projects/agent-monitoring-dashboard

项目特性:

  • ✅ Prometheus指标收集
  • ✅ Grafana Dashboard模板
  • ✅ ELK日志聚合
  • ✅ Jaeger链路追踪
  • ✅ 告警规则配置
  • ✅ Slack/邮件通知
  • ✅ Docker Compose一键部署

快速启动:

# 使用Docker Compose启动所有服务 docker-compose up -d # 访问服务 # Grafana: http://localhost:3000 (admin/admin) # Prometheus: http://localhost:9090 # Jaeger: http://localhost:16686 # Kibana: http://localhost:5601 # 运行示例应用 python example_app.py

🛠️ 常见问题与解决

Q1: 如何选择合适的监控粒度?

答:分层监控

应用层:QPS、响应时间、错误率(秒级) 业务层:转化率、用户行为(分钟级) 基础设施:CPU、内存、磁盘(分钟级) 成本层:API费用、资源使用(小时级)

Q2: 日志量太大怎么办?

答:采样和分级

# 只记录10%的成功请求日志 if random.random() < 0.1 or level >= logging.WARNING: logger.info(...) # 或者按租户分级 if tenant_tier == 'vip': log_level = logging.DEBUG else: log_level = logging.INFO

Q3: 如何降低监控开销?

答:异步和批量

# 异步发送指标 from prometheus_client import push_to_gateway # 批量发送日志 class BatchElasticsearchHandler(BufferingHandler): def __init__(self, capacity=1000): # 增大缓冲区 super().__init__(capacity)

📈 实际应用案例

案例1:电商客服Agent

监控重点:

  • 响应时间P95 < 1s
  • 错误率 < 2%
  • 缓存命中率 > 70%
  • 每日API费用 < $100

效果:

  • 故障发现时间从30分钟降到2分钟
  • 性能问题定位时间从2小时降到10分钟
  • API成本降低40%(通过监控发现异常调用)

案例2:金融投研Agent

监控重点:

  • 数据准确性(对比基准)
  • 合规性检查(敏感词检测)
  • 审计日志(所有操作留痕)
  • 高可用(99.9% SLA)

效果:

  • 满足金融监管要求
  • 实现完整的审计追溯
  • 系统可用性达到99.95%

🎯 总结

监控系统的核心价值:

  1. 可见性- 知道系统在发生什么
  2. 快速响应- 问题出现时立即知道
  3. 数据驱动- 基于数据做优化决策
  4. 成本控制- 实时监控费用
  5. 合规审计- 完整的操作记录

最佳实践:

  • 从简单开始,逐步完善
  • 关注业务指标,不只是技术指标
  • 设置合理的告警阈值,避免告警疲劳
  • 定期review监控看板,持续优化
  • 文档化所有指标和告警规则

下一步:

  • 部署完整的监控系统
  • 根据业务需求定制Dashboard
  • 设置告警通知渠道
  • 建立On-call值班制度

完整代码和详细教程:👉 GitHub仓库

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询