AI应用的可观测性工程:用Tracing和Logging看清LLM黑盒
“我的RAG系统回答了一个错误答案但我不知道为什么。” “Agent跑了2分钟什么都没完成我不知道它在做什么。” “用了新版本Prompt感觉质量变了但我说不清楚哪里变了。”这些是AI工程师最常见的困境根本原因是缺乏可观测性Observability。本文系统介绍如何为LLM应用构建完整的可观测性体系让AI系统的行为从黑盒变白盒。## 可观测性的三大支柱借鉴传统软件可观测性的三大支柱LLM应用的可观测性同样需要-Metrics指标定量衡量系统健康的数值如响应时延、Token消耗、成功率-Logs日志记录系统发生的事件包括每次LLM调用的输入输出-Traces追踪记录一次请求的完整执行链路特别是在Agent场景中追踪多步推理在LLM应用中还需要额外关注-Prompt版本追踪哪个版本的Prompt被用于哪次请求-Token使用分析详细的Token消耗分布找出成本热点-质量评估LLM生成质量的自动化指标## LangSmithLangChain生态的可观测性标配如果你的应用基于LangChain/LangGraphLangSmith是最省力的选择pythonimport osfrom langchain_openai import ChatOpenAIfrom langchain.callbacks.tracers import LangChainTracer# 配置LangSmithos.environ[LANGCHAIN_TRACING_V2] trueos.environ[LANGCHAIN_API_KEY] your_langsmith_api_keyos.environ[LANGCHAIN_PROJECT] my-rag-project# 之后所有LangChain调用自动追踪llm ChatOpenAI(modelgpt-4o)response llm.invoke(你好世界)# 这次调用的输入、输出、Token消耗、延迟都会自动记录到LangSmithLangSmith的关键功能- 自动记录每次LLM调用输入、输出、Token、延迟- 完整的Agent执行追踪每个工具调用都有记录- Prompt版本管理Hub- 数据集管理和自动化评估## 自建可观测性OpenTelemetry方案不想依赖第三方服务用OpenTelemetry构建自主可控的可观测性pythonfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.sdk.resources import Resourceimport timeimport jsonfrom functools import wraps# 初始化Tracer连接到Jaeger或Grafana Tempo等resource Resource(attributes{service.name: llm-application})provider TracerProvider(resourceresource)exporter OTLPSpanExporter(endpointhttp://localhost:4317)provider.add_span_processor(BatchSpanProcessor(exporter))trace.set_tracer_provider(provider)tracer trace.get_tracer(llm-app-tracer)def trace_llm_call(func): 装饰器自动追踪LLM调用 wraps(func) def wrapper(*args, **kwargs): with tracer.start_as_current_span(fllm.{func.__name__}) as span: start_time time.time() # 记录输入 if kwargs.get(messages): span.set_attribute(llm.input.messages, json.dumps(kwargs[messages][:1], ensure_asciiFalse)) if kwargs.get(model): span.set_attribute(llm.model, kwargs[model]) try: result func(*args, **kwargs) # 记录输出 duration (time.time() - start_time) * 1000 span.set_attribute(llm.latency_ms, duration) if hasattr(result, usage): span.set_attribute(llm.tokens.prompt, result.usage.prompt_tokens) span.set_attribute(llm.tokens.completion, result.usage.completion_tokens) span.set_attribute(llm.tokens.total, result.usage.total_tokens) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.set_status(trace.StatusCode.ERROR, str(e)) span.record_exception(e) raise return wrapper## 结构化日志LLM调用的标准格式pythonimport loggingimport jsonfrom datetime import datetimefrom openai import OpenAI# 配置结构化日志logging.basicConfig(levellogging.INFO)logger logging.getLogger(llm_app)class StructuredLLMLogger: 结构化LLM调用日志记录器 def __init__(self, client: OpenAI, app_name: str llm-app): self.client client self.app_name app_name def chat(self, messages: list[dict], model: str gpt-4o, trace_id: str None, **kwargs) - dict: 带完整日志记录的LLM调用 call_id trace_id or datetime.now().strftime(%Y%m%d_%H%M%S_%f) start_time time.time() # 记录请求 logger.info(json.dumps({ event: llm_request, call_id: call_id, app: self.app_name, model: model, message_count: len(messages), system_prompt_hash: hash(messages[0][content]) if messages[0][role] system else None, last_user_message: messages[-1][content][:200] if messages else , timestamp: datetime.now().isoformat(), }, ensure_asciiFalse)) try: response self.client.chat.completions.create( modelmodel, messagesmessages, **kwargs ) duration_ms (time.time() - start_time) * 1000 # 记录响应 logger.info(json.dumps({ event: llm_response, call_id: call_id, app: self.app_name, model: model, latency_ms: round(duration_ms, 2), prompt_tokens: response.usage.prompt_tokens, completion_tokens: response.usage.completion_tokens, total_tokens: response.usage.total_tokens, finish_reason: response.choices[0].finish_reason, response_preview: response.choices[0].message.content[:200], estimated_cost_usd: self._estimate_cost(model, response.usage), timestamp: datetime.now().isoformat(), }, ensure_asciiFalse)) return response except Exception as e: duration_ms (time.time() - start_time) * 1000 logger.error(json.dumps({ event: llm_error, call_id: call_id, model: model, latency_ms: round(duration_ms, 2), error_type: type(e).__name__, error_message: str(e), timestamp: datetime.now().isoformat(), }, ensure_asciiFalse)) raise def _estimate_cost(self, model: str, usage) - float: 估算API调用成本 pricing { gpt-4o: {input: 0.000005, output: 0.000015}, gpt-4o-mini: {input: 0.00000015, output: 0.0000006}, } model_price pricing.get(model, {input: 0.000005, output: 0.000015}) return (usage.prompt_tokens * model_price[input] usage.completion_tokens * model_price[output])## Agent执行追踪Agent场景的追踪更复杂需要记录整个推理链路pythonfrom dataclasses import dataclass, fieldfrom typing import Anyimport uuiddataclassclass AgentTraceSpan: span_id: str field(default_factorylambda: str(uuid.uuid4())[:8]) parent_id: str None name: str start_time: float field(default_factorytime.time) end_time: float None inputs: dict field(default_factorydict) outputs: dict field(default_factorydict) metadata: dict field(default_factorydict) error: str None children: list field(default_factorylist) def end(self, outputs: dict None, error: str None): self.end_time time.time() if outputs: self.outputs outputs if error: self.error error property def duration_ms(self) - float: if self.end_time: return (self.end_time - self.start_time) * 1000 return (time.time() - self.start_time) * 1000class AgentTracer: Agent执行追踪器 def __init__(self): self.traces [] self.current_span_stack [] def start_span(self, name: str, inputs: dict None, metadata: dict None) - AgentTraceSpan: parent_id self.current_span_stack[-1].span_id if self.current_span_stack else None span AgentTraceSpan( namename, parent_idparent_id, inputsinputs or {}, metadatametadata or {} ) if self.current_span_stack: self.current_span_stack[-1].children.append(span) else: self.traces.append(span) self.current_span_stack.append(span) return span def end_span(self, outputs: dict None, error: str None): if self.current_span_stack: span self.current_span_stack.pop() span.end(outputsoutputs, errorerror) return span def print_trace(self, span: AgentTraceSpan None, indent: int 0): 打印追踪树 if span is None: for trace in self.traces: self.print_trace(trace) return status ✓ if not span.error else ✗ print(f{ * indent}{status} [{span.duration_ms:.0f}ms] {span.name}) if span.error: print(f{ * (indent1)}ERROR: {span.error}) for child in span.children: self.print_trace(child, indent 1)# 使用示例tracer AgentTracer()async def traced_agent_run(task: str): root_span tracer.start_span(agent_run, inputs{task: task}) try: # 规划阶段 plan_span tracer.start_span(planning, inputs{task: task}) plan await generate_plan(task) tracer.end_span(outputs{plan: plan}) # 执行阶段 for i, step in enumerate(plan): step_span tracer.start_span(fexecute_step_{i}, inputs{step: step}) # 工具调用 tool_span tracer.start_span( ftool_{step[tool]}, inputs{args: step.get(args, {})} ) result await call_tool(step[tool], step.get(args, {})) tracer.end_span(outputs{result: str(result)[:500]}) tracer.end_span(outputs{status: completed}) tracer.end_span(outputs{status: success}) except Exception as e: tracer.end_span(errorstr(e)) raise # 打印追踪树 tracer.print_trace()## Prometheus指标监控将LLM调用指标暴露给Prometheus与现有监控基础设施集成pythonfrom prometheus_client import Counter, Histogram, Gauge, start_http_server# 定义指标llm_requests_total Counter( llm_requests_total, Total LLM API calls, [model, app, status])llm_latency_histogram Histogram( llm_latency_seconds, LLM API call latency, [model, app], buckets[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0])llm_tokens_counter Counter( llm_tokens_total, Total tokens consumed, [model, app, token_type])llm_cost_counter Counter( llm_cost_usd_total, Estimated USD cost of LLM calls, [model, app])# 指标收集中间件def record_llm_metrics(model: str, app: str, duration: float, usage, cost: float, status: str): llm_requests_total.labels(modelmodel, appapp, statusstatus).inc() llm_latency_histogram.labels(modelmodel, appapp).observe(duration) llm_tokens_counter.labels(modelmodel, appapp, token_typeprompt).inc(usage.prompt_tokens) llm_tokens_counter.labels(modelmodel, appapp, token_typecompletion).inc(usage.completion_tokens) llm_cost_counter.labels(modelmodel, appapp).inc(cost)# 启动Prometheus HTTP服务器暴露metrics端点start_http_server(8080) # curl http://localhost:8080/metrics## 可视化看板设计用Grafana构建LLM监控看板关键面板1.成本看板按模型/应用的每日/月度费用趋势2.性能看板P50/P95/P99延迟不同模型对比3.质量看板自动化质量评分趋势问题率4.Token分布看板Prompt vs Completion比例长尾请求分析## 小结构建LLM可观测性系统的最简路径1.第一步添加结构化日志记录每次LLM调用的关键信息2.第二步接入LangSmith如果用LangChain或OpenTelemetry3.第三步暴露Prometheus指标建立成本和性能告警4.第四步建立自动化质量评估定期跑评测集可观测性不是锦上添花而是生产级AI应用的地基。没有可观测性的AI系统出了问题只能靠猜。