欢迎光临
我们一直在努力

2026年大模型推理成本骤降的技术密码:从架构优化到生态重构

andy阅读(14)

引言:推理成本正在经历一场”静默革命”

2024年初,调用GPT-4 API处理100万token的成本约为30美元。到了2026年中,这个数字已经跌到了不足3美元——降幅超过90%。这并不是某个单一技术突破的结果,而是从模型架构、推理引擎、硬件适配到部署策略等全链路优化的综合效应。对于正在构建AI应用的技术团队来说,理解这场”静默革命”背后的技术驱动力,远比关注某个具体模型的能力提升更有价值。

本文将从技术架构层面,拆解大模型推理成本骤降的四大核心引擎,并分析这些变化对后端架构、团队分工和产品设计带来的深远影响。

AI推理芯片架构概念图

引擎一:KV Cache 优化的范式突破

大模型推理的开销大头从来不在计算,而在内存。Transformer解码过程中,每个生成的token都需要与之前所有token的Key和Value做注意力计算——这意味着KV Cache的大小与序列长度成线性增长,对于128K上下文窗口,KV Cache可能占据数十GB的显存。

Multi-Query Attention 与 GQA 的普及

2024年主流的MHA(Multi-Head Attention)模型中,每个注意力头都有独立的K和V投影矩阵,导致KV Cache随着头数线性增长。2025年以后,GQA(Grouped Query Attention)和MQA(Multi-Query Attention)几乎成为所有新模型的标配。以Llama 3系列为例,从70B模型的8个KV头(GQA分组)到最新架构中更激进的压缩策略,KV Cache节省了60%-75%的显存占用。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 简化的GQA实现示意
class GroupedQueryAttention(nn.Module):
    def __init__(self, dim, n_heads, n_kv_heads):
        super().__init__()
        self.n_heads = n_heads
        self.n_kv_heads = n_kv_heads
        self.n_rep = n_heads // n_kv_heads  # 每个KV头服务的查询头数
       
        self.wq = nn.Linear(dim, dim)
        self.wk = nn.Linear(dim, self.n_kv_heads * head_dim)
        self.wv = nn.Linear(dim, self.n_kv_heads * head_dim)
   
    def forward(self, x, kv_cache=None):
        # 生成查询在所有头上,但键值只在少数头上
        q = self.wq(x).view(batch, seq, self.n_heads, head_dim)
        k = self.wk(x).view(batch, seq, self.n_kv_heads, head_dim)
        v = self.wv(x).view(batch, seq, self.n_kv_heads, head_dim)
       
        # 通过重复扩展KV头匹配Q头数
        k = k.repeat_interleave(self.n_rep, dim=2)
        v = v.repeat_interleave(self.n_rep, dim=2)
        # 后续注意力计算...

推测性解码(Speculative Decoding)

传统自回归解码每次只能生成一个token,GPU利用率往往不到20%。推测性解码通过引入一个轻量级的”草稿模型”(draft model)每次生成K个候选token,然后由目标模型并行验证。由于验证阶段可以合并为一次前向传播,实际吞吐量提升了2-3倍,而计算量只增加了微小开销。

2026年,投机解码已经从学术论文走向了生产级部署。Google的Medusa、DeepMind的SPEED、以及开源社区实现的Lookahead Decoding等方案,在vLLM、TensorRT-LLM等推理框架中都有原生支持。配置代码越来越简洁:


1
2
3
4
5
6
7
8
9
10
11
12
13
# vLLM 中启用推测性解码(2026年版本)
from vllm import LLM, SamplingParams

llm = LLM(
    model="meta-llama/Llama-4-70B",
    speculative_model="meta-llama/Llama-4-70B-draft",
    num_speculative_tokens=5,  # 每次推测5个token
    speculative_max_model_len=8192,
    use_v2_block_manager=True,
)

params = SamplingParams(temperature=0.7, max_tokens=1024)
output = llm.generate("解释量子计算的基本原理", params)

引擎二:量化技术的代际跨越

量化是降低推理成本最直接的手段。从2024年的INT8/FP8到2026年广泛部署的FP4甚至混合精度量化,精度损失的控制技术取得了实质性突破。

从INT8到FP8再到FP4的路程

2024年,FP8训练和推理刚刚进入生产环境,主要依赖NVIDIA H100的Transformer Engine硬件支持。2025年,随着Blackwell架构的推出,FP4计算单元被直接集成到Tensor Core中,使得FP4推理的速度比FP16快了近4倍。

精度格式 显存节省(vs FP16) 推理速度提升 主流支持硬件 精度损失(典型任务)
FP16 基准 基准 所有GPU
INT8 50% 1.5-2x 几乎所有GPU <1%
FP8 50% 2-2.5x H100+, MI300X+ <0.5%
FP4 75% 3-4x B200+, 下一代MI 1-3%
NF4 (QLoRA) 87% 2-3x 任意GPU(软件模拟) 2-5%

激活量化与感知量化训练(QAT)

值得注意的另一个趋势是激活量化的成熟。早期的量化方案只量化权重(weight-only quantization),但激活值(activation)的分布远比权重复杂——存在明显的离群值(outliers)。2025-2026年,SmoothQuant和QuIP#等技术的改进版本,通过在通道维度上对激活值做平滑化处理,使得W8A8(8-bit权重、8-bit激活)和W4A8成为生产环境中的常见配置,进一步缩减了推理流水线的瓶颈。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 使用AutoGPTQ进行W4A16量化的典型流程
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
from transformers import AutoTokenizer

quantize_config = BaseQuantizeConfig(
    bits=4,                          # 4-bit权重
    group_size=128,                  # 分组大小
    desc_act=True,                   # 激活排序(提升精度)
    damp_percent=0.01,
    sym=True                         # 对称量化
)

model = AutoGPTQForCausalLM.from_pretrained(
    "Qwen/Qwen3-72B",
    quantize_config,
    device_map="auto"
)
model.quantize(calibration_dataset)
model.save_quantized("/models/qwen3-72b-4bit")

引擎三:推理引擎与调度系统的成熟

如果说量化是”硬”优化,那么推理引擎和调度系统就是”软”优化——它们决定了硬件利用率能接近理论上限的多少。

PagedAttention 与 vLLM 的持续进化

vLLM在2024年提出的PagedAttention解决了KV Cache的内存碎片问题,将内存利用率从40-50%提升到了95%以上。2026年,vLLM已经成为事实上的推理标准,其核心创新已经被所有主流推理框架复制。在此基础上,社区进一步引入了Prefix Caching(前缀缓存)、Chunked Prefill(分块预填充)和Automatic Prefix Detection(自动前缀检测)等机制。

一个实际案例:在某电商客服场景中,由于用户查询的前缀(”你好,我有一个关于订单的问题…”)高度重复,启用Prefix Caching后,首token延迟(TTFT)从800ms降到了150ms,整体吞吐量提升了3倍。

分离式推理架构(Disaggregated Serving)

2026年最显著的生产级变化是分离式推理架构的普及。传统架构中,prefill阶段(处理用户输入)和decode阶段(生成输出)共享同一批GPU资源。但这两个阶段的计算特征完全不同:

  • Prefill阶段:计算密集型,GPU利用率高,需要大量并行计算
  • Decode阶段:内存密集型,GPU利用率低,受限于内存带宽

分离式架构将这两个阶段分配到不同的GPU集群上,各自使用最优化的资源配置。例如,prefill节点使用计算密集型的H100,decode节点使用内存带宽优化的B200。通过负载均衡器动态路由请求,整体集群效率提升了40-60%。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 分离式推理的配置示例(基于SGLang)
# prefill_server.py
from sglang import Engine

engine = Engine(
    model_path="/models/llama-4-70b",
    tp_size=8,
    dp_size=1,
    node_role="prefill",  # 只处理prefill
    max_prefill_tokens=16384,
    schedule_policy="lpm",  # 最短处理时间优先
)

# decode_server.py
from sglang import Engine

engine = Engine(
    model_path="/models/llama-4-70b",
    tp_size=4,
    dp_size=2,
    node_role="decode",  # 只处理decode
    enable_prefix_caching=True,
    radix_cache_size=16_000_000_000,  # 16GB前缀缓存
)

连续批处理(Continuous Batching)的极致优化

连续批处理允许推理引擎在任意时刻将新请求插入到正在执行的批次中,而不是等待当前批次完成。2026年的实现已经进化到”微批处理”(micro-batching)级别——每个iteration都会重新评估批次组合,根据各请求的当前生成长度动态调整批次大小和组成。这使得GPU利用率从传统批处理的50-60%提升到了85-95%。

引擎四:硬件生态的多元竞争

推理成本的下降离不开硬件层面的竞争和多元化。2024-2026年,大模型推理芯片市场从NVIDIA一家独大走向了多极格局。

NVIDIA Blackwell 的推理专用优化

B200/B100引入了第二代Transformer Engine、FP4 Tensor Core、以及NVLink 5.0(带宽1.8TB/s)。对于推理场景,最大的改进在于FP4推理的硬件原生支持,以及NVLink Switch让72个GPU组成一个统一的推理节点,极大简化了大模型部署的分布式通信开销。

AMD MI350 的追赶

AMD Instinct MI350系列在2025年底推出,凭借CDNA 4架构和FP8/FP6推理的出色表现,在价格/性能比上开始具有竞争力。配合ROCm 6.x的成熟,越来越多的开源推理框架(vLLM、SGLang、llama.cpp)在AMD平台上实现了与NVIDIA平台90%以上的性能。对于成本敏感的中型团队,MI350正在成为H100的有力替代品。

推理专用芯片的崛起

2026年最值得关注的趋势是推理专用芯片的商业化落地。Groq的LPU(Language Processing Unit)继续在低延迟场景(<10ms TTFT)保持优势;Cerebras的Wafer-Scale Engine通过巨大的片上内存(40GB SRAM)完全消除了KV Cache的内存瓶颈;而中国厂商如寒武纪、华为昇腾也在特定场景中实现了性价比突破。

这种硬件层面的多元化竞争,直接推动了推理成本的持续下降——根据业界估算,2026年每token的推理成本相比2024年下降了约85%,而2027年预计还将再下降50-60%。

成本下降带来的生态重构

推理成本下降90%不仅仅是一个数字变化,它在重塑整个AI应用的技术栈和商业模式。

从”每查询计费”到”常驻推理”

当推理成本足够低时,AI模型的调用模式发生了根本变化。过去,应用只在用户明确触发时才调用模型(如”帮我总结这封邮件”)。现在,越来越多的系统让AI模型”常驻”——在后台持续分析信息流、预测用户需求、主动提供建议。例如,新一代IDE的代码补全已经从”按需触发”变为”实时流式预测”,背后是推理成本降低使得完全扫描整个文件上下文进行补全成为可能。

长上下文推理的普及

128K乃至1M token的上下文窗口在2024年还是旗舰模型的专属卖点,到2026年已经成为中端模型的标配。这得益于KV Cache优化和分离式架构的成熟——处理百万token上下文的首token延迟已经从分钟级降到了秒级。由此催生了”整库分析”类应用:将整个代码库、整个对话历史、甚至整个数据库表结构一次性注入上下文,让模型拥有全局视角。

Agent系统的成本门槛消失

2024年,构建一个需要多轮工具调用的Agent系统的成本令人望而却步——每次调用都消耗大量token,一个复杂的任务链可能花费数美元。2026年,同样的任务链成本不到0.1美元。这使得Agent从”实验性产品”变成了”默认架构”。越来越多的SaaS产品开始默认嵌入自主Agent,而不是简单的聊天界面。

部署建议:如何利用当前的低成本推理

对于正在构建AI应用的团队,以下几条实践建议可以帮助你充分利用当前的推理成本红利:

  1. 采用分层的推理栈:不要把所有请求都发给同一个模型。使用一个轻量级的”路由器”模型(0.5B-3B参数)做意图识别和分类,只在必要时路由到70B+的大模型。总体成本可以再降低50-70%。
  2. 善用推理缓存:对于重复性查询(FAQ、代码审查模板、常见问题分类),利用语义缓存(Semantic Cache)直接返回缓存结果。Semantic Cache在2026年已经非常成熟,Milvus、Qdrant等向量数据库都有现成的实现。
  3. 本地推理+云端混合:对于延迟敏感的场景(代码补全、实时翻译),在客户端部署4-8B参数的本地模型处理大部分请求,只在需要高质量输出时回退到云端大模型。llama.cpp和MLC-LLM在移动端和桌面端的推理已经足够快。
  4. 关注推理框架的社区活跃度:vLLM、SGLang、llama.cpp三个项目是目前社区最活跃、更新最快的推理框架。选择其中一个作为主力,保持与上游版本的同步,可以持续获得性能改进。

结语

大模型推理成本在两年内下降了90%,这不仅是技术进步的标志,更是AI应用大规模普及的推手。作为一个技术人,最值得做的不是焦虑于”错过了什么”,而是理解这些技术变化的底层逻辑,将其应用到自己的系统设计中。当推理成本接近零时,限制AI应用的天花板就不再是算力,而是我们对问题场景的理解深度和系统设计的能力。

2026年往后,AI的竞争将从”谁的模型更强”转向”谁的系统设计更好”——这才是真正的工程时代来临。

AI Agent 落地之路:生产环境中 Agent 架构设计的八大教训

andy阅读(37)

引言:从Demo到生产的鸿沟

2025到2026年,AI Agent从一个实验室概念迅速演变为企业级基础设施的核心组件。当无数技术团队兴奋地跑通了第一个”自动写邮件”的Demo后,等待他们的却是生产环境中的一连串”惊喜”:Token消耗失控、Agent陷入死循环、工具调用出错后无法恢复、多步推理的累积错误让结果完全不可用。本文基于多个生产级Agent系统的实际搭建经验,总结了八个关键教训,希望能帮助正在或即将构建Agent系统的团队少走弯路。

如果你只是跑过几个LangChain的示例或者用过Cursor/Cline之类的编码Agent,那么恭喜你,你只看到了Agent技术的冰山一角。真实的Agent生产部署,是一场关于可靠性、可观测性和成本控制的艰难平衡。

数据中心服务器架构
生产环境的Agent架构需要像数据中心一样严谨的设计

教训一:别让Agent”裸奔”——结构化输出是第一道防线

许多团队在开发Agent时犯的第一个错误,是让LLM自由输出文本,然后靠正则或提示词来解析。这在Demo中可行,但在生产中完全不可靠。LLM的输出格式漂移(format drift)是一个非常真实的问题——同一个模型在不同温度参数下、不同上下文长度下,输出JSON的结构可能出现微妙的差异。

强制结构化输出的方案对比

方案 可靠性 灵活性 延迟开销 推荐场景
JSON Mode(API原生) 简单工具调用
Function Calling 中高 标准Agent工作流
Outlines / JSONFormer 最高 高(需要logit-level约束) 自托管模型
Guidance(Microsoft) 复杂多步生成
提示词+后处理校验 仅限原型阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 推荐的结构化工具调用模式(TypeScript示例)
interface ToolCall {
  name: string;
  arguments: Record&lt;string, unknown&gt;;
  id: string;
}

function validateToolCall(raw: unknown): ToolCall {
  const parsed = typeof raw === 'string' ? JSON.parse(raw) : raw;
  if (!parsed || typeof parsed.name !== 'string') {
    throw new MalformedToolCallError('缺少 tool name', raw);
  }
  if (!parsed.id) {
    parsed.id = crypto.randomUUID();
  }
  return parsed as ToolCall;
}

我们的经验是:永远不要信任LLM的输出格式。即使使用Function Calling API,也必须加一层Schema验证(推荐Zod或Pydantic)。在API层面,优先使用支持JSON Mode或Structured Output的提供商(OpenAI的Structured Outputs、Anthropic的Tool Use、DeepSeek的JSON Mode),这在源头就大幅降低了格式错误的概率。

教训二:状态管理——Agent需要”工作记忆”

大多数开源Agent框架把对话历史当成唯一的状态。这在3-5轮交互中没问题,但当Agent需要执行20步、30步甚至上百步的复杂任务时,把全部历史塞进上下文的做法会导致灾难性的Token消耗和注意力稀释。

我们推荐的实践是分层状态管理:

  • 短期记忆(Short-term Memory):最近5-10轮交互,保留完整上下文用于推理
  • 工作记忆(Working Memory):当前任务的中间结果、变量、文件状态,使用结构化存储(JSON/Markdown)
  • 长期记忆(Long-term Memory):已完成任务的关键输出、环境配置、用户偏好,通过向量检索按需加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 分层记忆管理的简化实现(Python)
from dataclasses import dataclass, field
from typing import Any, Optional

@dataclass
class WorkingMemory:
    """Agent的当前工作记忆"""
    task_id: str
    original_goal: str
    completed_steps: list[str] = field(default_factory=list)
    pending_steps: list[str] = field(default_factory=list)
    intermediate_results: dict[str, Any] = field(default_factory=dict)
    current_file_state: Optional[str] = None

    def summarize(self) -> str:
        """生成紧凑的工作记忆摘要,用于注入系统提示"""
        lines = [
            f"任务: {self.original_goal[:80]}...",
            f"已完成 {len(self.completed_steps)} 步, 剩余 {len(self.pending_steps)} 步",
        ]
        for key, val in self.intermediate_results.items():
            lines.append(f"  {key}: {str(val)[:100]}")
        return "\n".join(lines)

关键点:每次LLM调用前,将工作记忆的摘要(而不是完整内容)注入系统提示。完整的中间结果在需要时才通过检索获取。这让上下文窗口从”全部历史”压缩为”当前聚焦区域”,大幅降低了Token消耗(实测减少40-60%)。

教训三:错误恢复机制比任务规划更重要

这是所有生产级Agent系统中最容易被低估的部分。大多数框架都有精美的”任务规划”(planning)逻辑,但当工具调用失败时——API超时、Shell命令返回非零退出码、文件不存在——它们只会简单地把错误堆栈抛回给LLM,期望它”自己想办法”。这在简单场景下能工作,但在复杂任务中经常导致死循环:Agent不断重试同样的操作,每次得到同样的错误。

推荐的多级错误恢复策略

我们设计了三级恢复机制:

  1. 第一级:自动重试(Retry) — 对于网络超时、临时不可用等错误,自动重试1-3次,使用指数退避
  2. 第二级:备选路径(Alternative) — 如果自动重试失败,尝试不同的实现方式(例如,如果apt-get install失败,尝试pip install;如果curl失败,尝试wget)
  3. 第三级:降级与报告(Degrade) — 如果所有备选路径都失败,记录详细错误信息,跳过该步骤,继续执行剩余任务,最后在结果中报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 错误恢复框架的核心逻辑
class AgentStepExecutor:
    def __init__(self, max_retries=3, retry_delay=1.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay

    async def execute_with_recovery(self, step: Step) -> StepResult:
        # 第一级:自动重试
        for attempt in range(1, self.max_retries + 1):
            try:
                return await self._execute_tool(step)
            except TransientError as e:
                logger.warning(f"尝试 {attempt}/{self.max_retries} 失败: {e}")
                await asyncio.sleep(self.retry_delay * (2 ** (attempt - 1)))

        # 第二级:备选路径
        for alt_method in step.alternatives:
            try:
                return await self._execute_tool(alt_method)
            except (TransientError, PermanentError):
                continue

        # 第三级:降级
        return StepResult(
            status=Status.SKIPPED,
            error=f"所有 {len(step.alternatives) + 1} 种方法均失败",
            partial_output=step.partial_work  # 如果部分完成
        )

生产环境中,我们的Agent在高复杂度任务(30步以上)中,约15%的步骤会遇到某种形式的失败。有了三级恢复机制,85%的失败在第一级就解决了,10%在第二级解决,只有不到5%的步骤最终被跳过。相比之下,没有恢复机制的Agent在超过15步的任务中失败率超过40%。

教训四:Token 预算管理——Agent的”经济命脉”

在一个生产Agent项目中,我们遇到过最尴尬的情况:一个Agent跑了45分钟,耗尽了OpenAI API的月度配额,最终输出的却是一条”抱歉,我没有完成所有步骤”的消息。Token成本在Agent场景下与传统Chat API完全不同——Agent可能在一个任务中调用API数十次甚至上百次,每次调用都包含完整的系统提示、对话历史和工具响应。

有效的Token预算管理需要三个维度:

  • 单步预算:每次LLM调用限制最大输出Token(推荐1024-2048),防止Agent”长篇大论”
  • 任务预算:整个任务的总Token上限(输入+输出),超过时触发压缩或终止
  • 上下文压缩触发:当上下文Token数超过阈值(如总窗口的50%)时,自动触发摘要压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Token 预算管理配置示例(YAML)
agent:
  token_budget:
    # 单次LLM调用的最大输出Token
    per_call_max_output: 2048

    # 整个会话的最大总Token消耗(输入+输出)
    session_max_total: 500000

    # 当上下文Token超过窗口的X%时触发压缩
    compression_threshold: 0.50
    compression_target: 0.20  # 压缩到窗口的20%

    # 当总消耗超过X%时发出警告
    warning_threshold: 0.80
    # 超过X%时强制终止
    hard_limit: 0.95

在我们的实践中,合理的Token预算管理将Agent任务的成本波动从”10倍差异”降低到了”2倍以内”。更重要的是,它避免了”预算跑冒”导致的意外账单。一个不加限制的Agent可能在一个任务中消耗价值50美元的Token——而我们通过预算管理将这个数字稳定在了3-8美元之间。

教训五:工具设计决定Agent能力的上限

Agent的能力边界直接由它可用的工具集决定。好的工具设计能让Agent高效完成任务;差的工具设计会让Agent在简单任务上绕圈子。我们总结了几条工具设计原则:

工具设计黄金法则

  1. 单一职责:每个工具只做一件事。不要设计一个”执行命令”的万能工具——把它拆分为”读取文件”、”写入文件”、”安装包”、”运行测试”等具体工具。LLM在理解具体工具时比理解泛化工具要准确得多。
  2. 清晰的Schema:参数名要自解释,description要包含示例和边界条件。避免让LLM猜测参数含义。
  3. 有意义的返回值:工具返回的不只是”成功/失败”,还应该包含结构化数据摘要,让LLM不需要再次调用就能理解结果。
  4. 幂等性优先:同一个工具用相同参数调用多次应该产生相同的结果。这在重试场景下至关重要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 良好工具设计示例(OpenAI Function Calling Schema)
{
  "name": "read_file",
  "description": "读取文件指定部分内容。支持偏移量和行数限制。如果文件不存在返回错误。",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "description": "文件路径(绝对路径或相对项目根目录的路径)"
      },
      "offset": {
        "type": "integer",
        "description": "起始行号(从1开始,默认为1)",
        "default": 1
      },
      "limit": {
        "type": "integer",
        "description": "最多读取行数(最大500,默认100)",
        "default": 100,
        "maximum": 500
      }
    },
    "required": ["path"]
  }
}

特别要注意的是工具返回值的格式。我们发现,当工具返回结构化JSON(包含状态码、摘要、数据三部分)时,LLM的后续决策准确率比返回纯文本高约22%。结构化返回让LLM能够快速定位关键信息,而不是在文本中”大海捞针”。

教训六:可观测性是生产Agent的”眼睛”

Agent的行为本质上是非确定性的——即使给定相同的输入,两次运行的结果也可能不同。这让传统的”日志+指标”监控模式捉襟见肘。我们需要专门为Agent设计可观测性方案。

必不可少的可观测性维度

维度 具体指标 收集方式
决策轨迹 每次LLM调用的完整请求/响应、选中的工具及参数、模型思考过程 持久化到数据库,支持回放
成本指标 每次调用的Token数(输入/输出)、累计成本、每步成本 实时统计,告警
性能指标 每步延迟、LLM响应时间、工具执行时间、总任务时长 Prometheus + Grafana
成功率 工具调用成功率、任务完成率、错误类型分布、重试次数 聚合统计
质量评估 结果准确性(人工/自动评估)、用户反馈评分 离线评估流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Agent 决策轨迹记录示例
{
  "session_id": "sess_abc123",
  "turn": 7,
  "timestamp": "2026-06-28T14:32:15.123Z",
  "model": "claude-sonnet-4-20260514",
  "input_tokens": 28500,
  "output_tokens": 423,
  "cost": 0.034,
  "decision": {
    "reasoning": "需要先检查项目结构来确定配置文件位置...",
    "chosen_tool": "search_files",
    "parameters": {
      "pattern": "*.config.*",
      "path": "/home/user/project"
    }
  },
  "tool_result": {
    "status": "success",
    "summary": "找到 3 个配置文件",
    "execution_time_ms": 234
  },
  "latency_ms": 3420
}

我们使用了一个关键设计:每一步的决策轨迹都可以回放。当Agent给出错误结果时,我们可以像调试代码一样”单步”查看每个决策过程,定位问题根源。这在传统监控中是无法做到的,但却是Agent开发中最强大的调试工具。

教训七:安全与护栏——不止是提示词注入

当Agent拥有执行Shell命令、读写文件、调用外部API的能力时,安全问题就从”提示词注入”升级到了”远程代码执行”级别。很多团队在开发Agent时完全忽略了这一点,直到生产事故发生。

生产级Agent的安全架构

  • 最小权限原则:Agent运行在一个受限的容器或沙箱中,只能访问预定义的目录和资源。不要用root权限运行Agent。
  • 操作白名单:定义Agent可以执行的操作范围,不在列表中的操作需要人工审批。例如,”可以读取/tmp目录,但不能删除文件”。
  • 命令审计:所有通过Agent执行的Shell命令都记录日志并发送到安全审计系统。
  • 敏感信息过滤:工具输出中的API Key、Token、密码等敏感信息自动脱敏后再返回给LLM。
  • 速率限制:限制Agent的单位时间操作次数,防止意外的高频调用导致外部服务被限流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Agent 安全策略配置
security:
  # 操作许可级别
  execution_mode: sandbox  # strict | sandbox | permissive

  # 禁止执行的命令模式
  blocked_commands:
    - "rm -rf /*"
    - "dd if=/dev/zero"
    - "mkfs.*"
    - ":(){ :|:& };:"  # fork炸弹

  # 敏感信息脱敏模式
  redact_secrets: true
  redact_patterns:
    - "sk-[a-zA-Z0-9]{20,}"     # OpenAI Key
    - "AKIA[0-9A-Z]{16}"         # AWS Access Key
    - "ghp_[a-zA-Z0-9]{36}"     # GitHub Token

  # 操作限流
  rate_limits:
    max_commands_per_minute: 10
    max_api_calls_per_minute: 30

特别要提一点:不要在系统提示中明文写入API Key。这个错误比看起来更常见——有人在开发时为了方便在提示词里写了测试Key,结果Agent在某个工具调用中把它输出到了日志里。密钥应该始终通过环境变量注入,并且工具输出应该自动过滤。

教训八:人机协作——不要把Agent当成完全自主的系统

这是最重要的教训。目前没有任何一个纯LLM驱动的Agent系统能够在复杂任务上实现100%的自主性。试图构建”全自动”Agent的团队最终都会在某个点上碰壁——要么是Agent做出了错误的关键决策,要么是在需要领域知识时产生了幻觉。

我们推荐的模式是“人在回路中”(Human-in-the-Loop),但要有具体的设计:

  • 审批点(Checkpoints):在关键决策点(如”删除数据库”、”修改生产配置”、”支付操作”)设置人工审批。Agent执行到这里自动暂停,等待人工确认。
  • 结果审核(Review):Agent完成任务后输出结果摘要,人工审核后再执行最终操作(如”生成代码变更→人工审查→合并”)。
  • 反馈学习(Feedback):人工对Agent的输出进行评分和纠偏,这些反馈作为后续调优的训练数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Human-in-the-Loop 审批流程示例
async def execute_with_approval(step: CriticalStep) -> StepResult:
    # 1. Agent 先准备执行计划
    plan = await agent.prepare_execution_plan(step)

    # 2. 向用户展示计划并请求审批
    approval = await request_approval(
        user_id=step.owner,
        message=f"Agent 请求执行高危操作:\n{plan.summary()}",
        timeout_minutes=30
    )

    if approval.status == "approved":
        return await agent.execute(step)
    elif approval.status == "modified":
        return await agent.execute(step, override_params=approval.modifications)
    else:  # rejected
        return StepResult(status=Status.REJECTED, reason=approval.reason)

数据显示,在引入人工审批点后,Agent完成的关键任务中准确率从76%提升到了94%,而总耗时仅增加了8-15%(因为大部分步骤仍然是自动执行的,只有关键步骤需要等待人工确认)。人机协作不是对Agent能力的否定,而是对Agent可靠性的必要补充。

总结:Agent架构的成熟度模型

最后,用一个成熟度模型来总结生产级Agent架构的演进路径:

阶段 特征 典型问题 适用场景
L1: Demo 单轮调用,无状态,无错误处理 格式不稳定,无法恢复 原型验证
L2: 基础 多步推理,基础重试 Token失控,成本不可控 内部工具
L3: 可靠 分层记忆,三级恢复,Token预算 可观测性不足 企业内应用
L4: 成熟 全链路可观测,安全护栏,人机协作 需要持续的反馈循环 面向客户的系统
L5: 自适应 从反馈中自动学习,模型与服务动态适配 高度复杂,维护成本高 前沿探索

大多数团队目前处于L2到L3之间。不要试图一步跳到L5——生产系统的可靠性是由每一层的扎实工程积累起来的。在加入更高级功能之前,先把基础的错误恢复、Token管理和可观测性做好。

AI Agent无疑是2026年最令人兴奋的技术方向之一。但兴奋之余,我们需要用工程化的思维来对待它——不是把它当作魔法,而是当作一种需要精心设计和维护的分布式系统。希望这八个教训能为你的Agent架构之路提供一些实用的指导。

封面图:Unsplash – Data Center

MCP协议深度解析:AI Agent工具调用的标准化革命

andy阅读(47)

引言:当AI Agent遇见标准化协议

2025年底,Anthropic发布了Model Context Protocol(MCP)协议规范,这个看似简单的开放协议在短短半年内迅速成为AI Agent领域最炙手可热的基础设施标准。从OpenAI的Function Calling到各家大模型的工具调用接口,业界长期缺乏一个统一的、与模型无关的工具集成标准。MCP的出现,试图填补这一空白。

截至2026年中,MCP已经在超过200个开源项目中获得支持,主流的LLM框架(LangChain、LlamaIndex、Semantic Kernel)、IDE(VS Code、JetBrains)和开发工具都纷纷接入。更重要的是,它正在改变我们对AI Agent系统架构的思考方式。

本文将从技术实现角度,深入分析MCP协议的核心设计、实际部署方案,以及它对整个AI开发生态带来的深远影响。

MCP协议的核心架构设计

MCP采用客户端-服务器(Client-Server)架构,这与传统的AI工具调用模式有本质不同。在传统的Function Calling模式中,工具定义是嵌入在模型请求中的JSON Schema,每个调用的工具逻辑由大模型调用方直接执行。而MCP将这一过程拆分成了三个独立的角色:

  • MCP Host:运行AI Agent的宿主环境(如Claude Desktop、VS Code插件、自定义Agent框架)
  • MCP Client:与MCP Server建立一对一连接的客户端,负责协议通信
  • MCP Server:提供具体工具能力的轻量级服务,每个Server暴露一组相关的工具和资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// MCP协议的基本通信流程
┌─────────────────┐      ┌─────────────────┐      ┌─────────────────┐
│   AI Agent      │      │   MCP Client    │      │   MCP Server    │
│  (Host/LLM)     │◄────►│  (协议层)       │◄────►│  (工具提供方)   │
└─────────────────┘      └─────────────────┘      └─────────────────┘
       │                                                    │
       │  List Tools Request                                 │  Tool Registry
       │────────────────────────────────────────────────►   │
       │                                                    │
       │  Tools List                                        │
       │◄────────────────────────────────────────────────── │
       │                                                    │
       │  Call Tool: search_codebase                        │
       │────────────────────────────────────────────────►   │
       │                                                    │
       │  Tool Result                                       │
       │◄────────────────────────────────────────────────── │
</pre>

这种解耦设计带来了几个关键优势:首先,工具的实现与模型调用完全分离,同一个MCP Server可以被任意支持MCP的AI Agent复用;其次,Server可以运行在独立的进程中,实现安全的沙箱隔离;最后,工具的生命周期管理变得标准化——启动、发现、调用、关闭都有明确的协议规范。

协议核心能力:Tools、Resources与Prompts

MCP定义了三种核心原语(Primitives),它们共同构成了AI Agent与外部世界交互的基础:

Tools(工具)

Tools是MCP中最核心的抽象。每个Tool类似于一个带JSON Schema参数定义的远程函数,AI模型可以通过MCP Client调用它。与Function Calling不同的是,MCP的Tool定义是动态发现的——Server可以在运行时注册新的Tool,不需要修改Host端的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "search_codebase",
    "arguments": {
      "query": "database connection pool",
      "file_pattern": "*.py",
      "max_results": 10
    }
  }
}
</pre>

MCP使用JSON-RPC 2.0作为通信协议,这与诸多现有的工具和框架兼容。每次工具调用的请求和响应都有标准的格式,方便调试和日志记录。

Resources(资源)

Resources是MCP对"读取"操作的抽象。与Tools的"执行"语义不同,Resources更类似于文件系统的读操作——Client向Server请求某个URI对应的内容。这种设计使得AI Agent能够以统一的方式访问各种数据源:本地文件、数据库记录、API响应、甚至是实时监控数据。

一个Resource的URI示例格式:

1
file:///logs/app.log

1
postgres://orders/recent

。Server通过实现Resource模板来提供动态内容的获取能力。

Prompts(提示模板)

Prompts是MCP中经常被忽视但同样重要的能力。它允许Server预定义一些可复用的提示模板(Prompt Templates),AI Agent可以在特定场景下"调用"这些模板来生成高质量的交互上下文。这对于需要领域专业知识交互的场景特别有用——比如数据库管理工具可以提供"分析慢查询"的提示模板,其中包含SQL分析相关的指令。

生产级MCP Server开发实战

理解了协议设计后,让我们通过一个实际的例子来了解如何在Python中开发一个生产级别的MCP Server。下面是一个文件系统分析工具的MCP Server实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# filesystem_mcp_server.py
import os
import json
import hashlib
from pathlib import Path
from typing import Any
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types

# 创建MCP Server实例
server = Server("filesystem-analyzer")

@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    """注册两个工具:分析目录和查找大文件"""
    return [
        types.Tool(
            name="analyze_directory",
            description="分析指定目录的文件类型分布、大小统计和目录深度",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "要分析的目录路径"
                    },
                    "max_depth": {
                        "type": "integer",
                        "description": "最大递归深度",
                        "default": 3
                    }
                },
                "required": ["path"]
            }
        ),
        types.Tool(
            name="find_large_files",
            description="查找目录中超过指定大小的文件",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string"},
                    "min_size_mb": {
                        "type": "integer",
                        "description": "最小文件大小(MB)",
                        "default": 100
                    },
                    "limit": {
                        "type": "integer",
                        "default": 20
                    }
                },
                "required": ["path"]
            }
        )
    ]

@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> list[types.TextContent]:
    args = arguments or {}
   
    if name == "analyze_directory":
        path = args["path"]
        max_depth = args.get("max_depth", 3)
       
        if not os.path.isdir(path):
            return [types.TextContent(
                type="text",
                text=json.dumps({"error": f"路径不存在或不是目录: {path}"})
            )]
       
        result = analyze_directory_structure(path, max_depth)
        return [types.TextContent(
            type="text",
            text=json.dumps(result, indent=2, ensure_ascii=False)
        )]
   
    elif name == "find_large_files":
        path = args["path"]
        min_size = args.get("min_size_mb", 100) * 1024 * 1024
        limit = args.get("limit", 20)
       
        large_files = []
        for root, dirs, files in os.walk(path):
            for file in files:
                try:
                    fpath = os.path.join(root, file)
                    size = os.path.getsize(fpath)
                    if size >= min_size:
                        large_files.append({
                            "path": fpath,
                            "size_mb": round(size / (1024*1024), 2)
                        })
                except (OSError, PermissionError):
                    continue
       
        large_files.sort(key=lambda x: x["size_mb"], reverse=True)
        return [types.TextContent(
            type="text",
            text=json.dumps(large_files[:limit], indent=2, ensure_ascii=False)
        )]
   
    raise ValueError(f"未知工具: {name}")
</pre>

要让这个Server运行起来,只需几行代码的启动入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="filesystem-analyzer",
                server_version="1.0.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={}
                )
            )
        )

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
</pre>

这个例子展示了MCP Server开发的几个关键模式:通过装饰器注册工具、使用异步I/O处理请求、返回结构化的JSON结果。在实际生产环境中,你还需要添加日志记录、错误处理、性能监控和认证机制。

MCP vs. Function Calling:关键差异分析

维度 MCP协议 传统Function Calling
架构模式 客户端-服务器解耦 紧耦合的函数定义
工具发现 运行时动态发现 编译期或启动时静态定义
安全隔离 进程级沙箱隔离 同一进程内调用
协议标准化 JSON-RPC 2.0标准 厂商自定义格式
跨模型兼容 支持所有主流LLM 绑定特定模型提供商
资源管理 支持Resources/Prompts 仅支持Tools
生命周期 标准化的启动/关闭流程 无统一规范

从上表可以看出,MCP的核心优势在于标准化和解耦。传统Function Calling模式下,每个AI应用都需要自行实现工具加载、参数校验、结果处理和错误重试。而MCP将这些基础设施标准化了,开发者只需关注业务逻辑的实现。

MCP在企业落地中的实践考量

尽管MCP的优势明显,但在实际企业部署中仍有几个需要认真考虑的方面:

安全性

MCP Server运行在独立的进程中,这意味着传统的进程间安全策略——如seccomp、AppArmor、Linux Capabilities——都可以直接应用。更重要的是,你可以在不同的安全级别上运行不同的MCP Server:敏感数据库操作在高安全级别的Server中运行,而公共API调用则在低安全级别的Server中运行。这种"最小权限原则"在传统的单一进程中很难实现。

性能与延迟

每次MCP工具调用都涉及进程间通信(IPC)开销。在我们的基准测试中,本地的stdio传输方式延迟约为0.5-2ms,而通过网络传输的SSE(Server-Sent Events)方式延迟在5-20ms之间。对于大多数AI Agent场景,这个延迟是可以接受的,因为LLM本身的推理时间通常在秒级。但对于高频调用的场景(如代码补全),建议使用stdio传输并在同一主机上部署。

运维与监控

MCP标准协议使得运维工具可以统一管理所有MCP Server。开源的mcp-inspector工具可以连接到任意MCP Server,查看其注册的工具列表、测试工具调用、监控资源使用情况。在Kubernetes环境中,MCP Server可以作为Sidecar容器部署在AI Agent Pod中,利用K8s的原生健康检查、资源限制和日志收集能力。

MCP生态的未来展望

站在2026年的时间节点回望,MCP协议的出现标志着AI Agent从"各自为战"走向"标准化协作"的关键一步。以下几个趋势值得密切关注:

  • MCP Gateway:类似API Gateway的MCP路由网关正在兴起,它可以在多个MCP Server之间做负载均衡、认证授权和流量管理
  • MCP Registry:公共的MCP Server注册中心,开发者可以像使用npm/pip一样安装和共享MCP Server
  • 跨语言生态:除了Python和TypeScript的官方SDK,社区正在开发Rust、Go、Java等语言的MCP实现
  • 与A2A协议互补:Google推出的Agent-to-Agent(A2A)协议负责Agent之间的协作,MCP负责Agent与工具的连接,二者形成了互补的生态

可以预见,未来一年内MCP将像HTTP之于Web一样,成为AI Agent基础设施中不可或缺的一层。对于正在建设AI Agent系统的团队来说,现在就是拥抱MCP的最佳时机——越早接入,积累的标准化工具资产就越有价值。

结语

MCP协议不只是一个技术规范,它代表了一种思维方式的变化——从"AI模型调用函数"到"AI Agent与工具通过标准协议协作"。这种转变使得AI系统从单体智能走向了分布式智能生态。

对于开发者而言,理解并掌握MCP协议,就像十年前理解RESTful API一样,正在成为AI时代的一项基础技能。无论你是AI应用开发者、DevOps工程师还是后端架构师,MCP都值得你投入时间去学习和实践。

AI Agent与MCP协议概念图

从工具调用到自主决策:2026年AI Agent技术栈的三个关键转变

andy阅读(61)

AI Agent技术演进

2026年上半年已经接近尾声,AI Agent从年初的”概念验证”阶段,正在快速进入”生产落地”阶段。回顾这半年的技术发展,有三个关键转变值得我们关注:工具调用范式的标准化、记忆系统的工程化、以及多Agent协作的实用化。这些变化不仅仅是技术层面的迭代,更代表了AI应用架构设计思路的根本转变。

本文将从一线开发实践的角度,分析这三个转变背后的技术逻辑,以及它们对开发者意味着什么。

一、工具调用:从”手搓JSON Schema”到MCP协议标准化

2025年,几乎每个AI Agent框架都有自己的工具定义格式。LangChain用Tool对象,AutoGen用function_map,CrewAI用装饰器,Hermes用registry。开发者想要复用一个已有的工具适配器,往往需要写一层胶水代码来做格式转换。

Anthropic提出的MCP(Model Context Protocol)协议正在改变这一局面。到2026年中,主流Agent框架几乎都支持了MCP Server作为工具来源:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 典型的MCP Server定义(Python SDK)<br />
from mcp.server import Server<br />
from mcp.types import Tool, TextContent

server = Server("database-tools")

@server.tool()<br />
async def query_database(sql: str) -&gt; str:<br />
    """执行SQL查询并返回结果"""<br />
    result = await db.execute(sql)<br />
    return TextContent(type="text", text=str(result))

@server.tool()<br />
async def list_tables() -&gt; str:<br />
    """列出所有数据表"""<br />
    tables = await db.list_tables()<br />
    return TextContent(type="text", text=", ".join(tables))

标准化带来的最大好处不是”写一次到处用”,而是工具生态的可组合性。你可以在GitHub上找到一个现成的MCP Server,直接配置到你的Agent里,无需修改任何代码。这就像npm对Node.js的意义——标准化的包管理让生态爆发成为可能。

代码开发

二、记忆系统:从”把全部历史塞进Context”到分层记忆架构

早期的Agent记忆方案非常粗暴——把所有对话历史拼接成一个长prompt发给模型。当context window从8K扩展到128K甚至1M时,这种方案”勉强能用”,但成本和延迟都不可接受。

2026年的记忆架构普遍采用了分层设计:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class HierarchicalMemory:<br />
    def <strong>init</strong>(self):<br />
        # 工作记忆:当前对话上下文(最近N轮)<br />
        self.working_memory = ConversationBuffer(max_turns=10)<br />
        # 短期记忆:本次会话的关键事实摘要<br />
        self.short_term = SummaryBuffer(threshold=0.7)<br />
        # 长期记忆:跨会话持久化的用户画像和知识<br />
        self.long_term = VectorStore(namespace="user_knowledge")<br />
        # 情景记忆:特定任务的执行记录<br />
        self.episodic = SQLiteStore(table="episodes")
<div class="codehilite"><pre><span></span><code>async<span class="w"> </span>def<span class="w"> </span>retrieve(self,<span class="w"> </span>query:<span class="w"> </span>str,<span class="w"> </span>k:<span class="w"> </span>int<span class="w"> </span>=<span class="w"> </span>5)<span class="w"> </span>-&gt;<span class="w"> </span>list:
<span class="w">    </span>&quot;&quot;&quot;根据查询从各层记忆中召回相关信息&quot;&quot;&quot;
<span class="w">    </span>results<span class="w"> </span>=<span class="w"> </span>[]
<span class="w">    </span>#<span class="w"> </span>1.<span class="w"> </span>先查工作记忆(最快)
<span class="w">    </span>results.extend(self.working_memory.search(query))
<span class="w">    </span>#<span class="w"> </span>2.<span class="w"> </span>查长期向量记忆
<span class="w">    </span>results.extend(await<span class="w"> </span>self.long_term.similarity_search(query,<span class="w"> </span>k=k))
<span class="w">    </span>#<span class="w"> </span>3.<span class="w"> </span>查情景记忆
<span class="w">    </span>results.extend(await<span class="w"> </span>self.episodic.search(query))
<span class="w">    </span>return<span class="w"> </span>self.rank_and_deduplicate(results)<span class="nt">&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;</span>这种分层架构的核心思想是:不同时间尺度的信息有不同的检索模式。工作记忆用滑动窗口,短期记忆用摘要压缩,长期记忆用向量检索,情景记忆用结构化查询。各层各司其职,而不是把所有信息都扔进一个巨大的embedding空间。<span class="nt">&lt;/p&gt;&lt;h2&gt;</span>三、多Agent协作:从&quot;编排剧本&quot;到&quot;自治团队&quot;<span class="nt">&lt;/h2&gt;&lt;p&gt;</span>2025年的多Agent系统大多是&quot;编排式&quot;的——由一个orchestrator按照预定义的workflow依次调用各个agent。本质上是把一个复杂的prompt拆成了多个小prompt,通过代码逻辑串联。<span class="nt">&lt;/p&gt;&lt;p&gt;</span>2026年出现了更接近&quot;自治团队&quot;的模式:<span class="nt">&lt;/p&gt;&lt;pre&gt;&lt;code</span><span class="w"> </span><span class="na">class=</span><span class="s">&quot;language-python&quot;</span><span class="nt">&gt;</span>#<span class="w"> </span>Kanban式多Agent协作模式

任务以看板形式管理,Agent自主认领和执行

Orchestrator将大任务拆解到看板

kanban.create_task(
title=”实现用户认证模块”,
description=”包含JWT登录、OAuth2集成、权限中间件”,
assignee=”backend-agent”,
dependencies=[“database-schema-task”]
)

Agent自主认领、执行、汇报

Worker Agent通过工具查看自己的任务队列

tasks = kanban.list_my_tasks(status=”ready”)
for task in tasks:
kanban.claim(task.id)
result = execute_task(task)
kanban.complete(task.id, summary=result)

关键区别在于:编排模式下,orchestrator需要知道每个agent的能力和调用顺序;自治模式下,agent自己判断何时执行、如何执行。这大幅降低了多Agent系统的开发和维护成本。

技术架构

四、对开发者的实际建议

基于以上观察,对正在构建AI Agent应用的开发者有几点建议:

1. 优先采用MCP协议做工具层。即使你现在只有一个Agent,用MCP封装工具也能为未来的扩展打好基础。迁移成本在早期最低。

2. 不要跳过分层记忆直接上RAG。很多团队一上来就搭向量数据库,结果发现检索质量很差。先做好对话摘要和关键事实提取,效果往往比复杂的RAG pipeline更好。

3. 多Agent不是银弹。如果你的任务可以由单个Agent完成,就不要引入多Agent。多Agent的真正价值在于:任务需要不同能力域的专家、或者需要并行处理。

总结

2026年AI Agent技术栈正在从”能用”走向”好用”。MCP协议让工具生态可组合,分层记忆让Agent有更合理的认知架构,自治式多Agent协作让复杂任务编排更灵活。作为开发者,我们正处在一个技术范式快速迭代的窗口期——选择正确的架构比选择正确的模型更重要。

技术选型的核心原则始终不变:简单性优先,渐进式复杂化。先用最简单的方案验证需求,再根据实际瓶颈逐步引入更复杂的架构。AI Agent领域尤其如此——这个领域变化太快,过度设计的成本远高于迭代重构的成本。

如何解决在WSL系统中tail -f windows文件内容不能实时刷新的问题

andy阅读(4139)

如何解决在WSL系统中tail -f windows文件内容不能实时刷新的问题

日常在windows10系统下开发,想借助wsl2里的ubuntu中的命令,方便日常工作, 比如使用tail -f、grep等命令定位查看日志。

我们都知道windows里的文件系统会在wsl中的ubuntu以/mnt/c, /mnt/d 这种方式挂载,
从而让我们能 tail -f /mnt/{windows系统的日志文件路径} 来实时查看日志,然而实际情况则是,日志可以看但不会像正常的tail -f那样实时滚动,如何解决呢,很简单!直接在命令上加个

1
---disable-inotify

的参数即可,如下


1
tail -f ---disable-inotify /mnt/d/tomcat/xx.log

注意是三个杠
为了方便,也可以加个别名


1
alias tail="tail ---disable-inotify"

CentOS安装TensorRT指南

andy阅读(12119)

首先介绍下tensorRT,tensorRT类似于tensorflow serving,都是一种用于将训练好的深度学习模型用于实时inference的工具,区别在于tensorflow serving是以一种server的方式提供出来的也就是grpc服务,而tensorRT目前是以一种library的方式提供的,当然我们可以自行包装成一个server,其实NVIDIA官方也已经有基于tensorRT的Inference Server了这个是后话。

相比于tensorflow serving,tensorRT对gpu的支持更加的好,inference更加快,毕竟是NVIDIA自家的;并且tensorRT支持多种深度学习框架,比如tensorflow、caffe、paddle paddle,pytorch等,tensorflow serving的话虽说也可以通过扩展的方式来支持,但是比较麻烦而且非官方。

caffe模型的话可以直接导入,其他深度学习框架的模型需要通过tensorRT的parse工具转换成UFF格式,或者是ONNX标准。tensorRT甚至支持以编程的方式直接添加layer,并设置参数。

好了上面是简单介绍下tensorRT,下面讲下如何在centos上安装。tensorRT的官方是比较支持Ubuntu的,提供的文档也基本基于Ubuntu。本文是基于Centos7.5, Tesla M40的。

1.首先你需要安装好NVIDIA驱动、cuda、cudnn,我的机器已经安装了 不多说,网上的教程也多的是。 这里要注意NVIDIA的driver要375以上版本的,cuda我这里用的是9.0,cudnn我用的是7.1。

2.安装好上面步骤之后,到这里https://developer.nvidia.com/nvidia-tensorrt-download 下载tensorRT 4.0的安装包,注意要选对符合你cuda和cudnn的对应版本的,另外这里列出的都是Ubuntu的,没关系,我们不选tar file installed packages。

3.下载完后,解压,接着 vi ~/.bashrc,添加如下内容


1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/andy/TensorRT/lib:/usr/local/cuda-9.0/lib64

其中/home/andy/TensorRT替换成你自己实际的目录,cuda-9.0也是。


1
source ~/.bashrc

使之生效。

4.然后就是安装对应的python库,包括tensorRT,uff和graphsurgeon。
方法是直接用pip install对应的whl包,几个包分别在tensorRT目录下的python、uff、graphsurgeon目录下,进到这三个目录下,执行 pip install xxx.whl即可。

搞定后,进到python环境下,执行下import tensorrt,正常的话可以成功导入。

5.最后可以编译一下tensorRT提供的一些sample。进到tensorRT的sample目录下,执行make CUDA_INSTALL_DIR=/usr/local/cuda,完成后到tensorRT的bin目录下,可以看到已经生成了可执行的sample,执行./sample_mnist 就可以输出一副字符组成的数字图片,下面跟着mnist的预测结果。 ok整个安装搞定了。

过程中可能会遇到的错误
1.报错


1
libnvparsers.so: undefined reference to `std::invalid_argument,undefined reference to `std::__throw_out_of_range_fmt(char const*, ...)

解决方法:gcc的版本问题,可以升级到gcc5.3,然后在环境变量中替换老的gcc,并且把新的gcc的glibc替换原有的旧的

关于SBT你需要知道的那些事

andy阅读(6197)

关于SBT你需要知道的那些事

之前在做一个有关spark的项目的时候,需要使用sbt构建scala代码,以前主要用maven,gradle也用一点,但是sbt实在陌生,摸索了一阵,把自己在使用sbt中最需要了解的东西记下来,以备同样不熟悉的同学做个参考。

SBT常识

sbt是个类比于maven、gradle的构建工具,主要用于scala项目 当然也可以用于别的。一个标准的sbt项目具有如下结构


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
build.sbt                   &lt;- sbt构建定义文件,后缀名必须是.sbt,前边可以是任意名字,不要求必须是build
project/                    &lt;- project目录下的所有.scala与.sbt文件都会被自动载入
  build.properties          &lt;- 指定sbt的版本,如sbt.version=1.0.0,sbt启动器会自动安装本地没有的版本
  Dependencies.scala        &lt;- 依赖配置,如果依赖较简单,可以省略该文件,直接写在build.sbt中
  plugins.sbt               &lt;- 插件定义
src/
  main/
    resources/
    scala/                  &lt;- scala源文件
    java/
  test/
    resources/
    scala/                  &lt;- scala测试源文件
    java/
target/
bin/

最核心的当然是build.sbt这个文件,类比于maven中pom.xml 这个文件可以用scala语言编写,也可以包含一些特定的sbt的语法的东西,所以有时候看上去比较奇怪。
总结一下最基本的配置方式是key := value的方式

如何解决SBT在国内太慢的问题

我使用sbt的第一个问题就是太慢了,sbt里虽然可以使用maven的仓库 但是好像并没有继承maven的settings.xml配置的mirror,所以这里我们要对sbt做一下镜像的配置,比较合适的一个可以是这样


1
2
3
4
5
6
7
[repositories]
local
aliyun-nexus: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

SBT如何添加依赖

一个最基本的build.sbt文件


1
2
3
4
name := "sss1"
version := "0.1"
scalaVersion := "2.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"

就像前面说的,最基本的就是key := value
然后添加依赖就是libraryDependencies += “groupid” % “artifactId” % “version”
其中第一个百分号可以换成双百分号”%%”,表示在artifactId上补上当前的scala版本,比如上面的那个最终的依赖坐标就是 groupId:org.apache.spark, artifactId:spark-core_2.11, version:2.1.1

SBT如何添加本地依赖

这个很方便,SBT默认会把项目根目录下的lib目录作为“非托管依赖”,所以只要将你的jar包放到这个目录就可以了,当然如果的jar不想放在这个目录(我也不知道为什么,老子就是不想放在这个目录)这种情况你可以在build.sbt中配置


1
unmanagedBase := baseDirectory.value / "custom_lib"

SBT如何打印依赖树

方法可能很多,但是最简单,效果最好的方法就是利用一个插件
编辑 ~/.sbt/0.13/plugins/plugins.sbt 加上一行


1
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")

然后在项目根目录进入 sbt console
然后执行命令

1
tasks -V

会列出当前项目可用的sbt命令,你会发现多了一些dependency开头的,就是插件提供给我们的,我比较喜欢的一个命令是

1
dependencyBrowseGraphHtml

可以生成一个依赖树的HTML页面 比较直观。如下图
sbt查看html依赖树

由于本人也是刚开始使用sbt 遇到的问题还比较少 所以此文会继续保持更新

java通过gRPC整合tensorflow serving——gRPC java入门例子

andy阅读(8426)

java通过gRPC整合tensorflow serving——gRPC java入门例子

项目中以前需要把算法同事们train好的tensorflow model包装成服务提供给其他部门应用,一开始我们使用python直接调用tensorflow,然后java和python之间的交互 通过jni或者rabbitmq的方式,这种方式的问题就是调用tensorflow的效率较低,不得不频繁的load model的checkpoint文件,推断执行慢,后来随着tensorflow官方推出tensorflow serving,我们也尝试了这种方式,发现确实可以很大程度上提升推断的性能,从一开始调用tensorflow每一次推断差不多要几秒钟(导致我们的服务一开始基本都是只能提供异步批量处理的方式),后来的tfserving的方式 耗时降到了百毫秒的级别,使得同步的调用成为现实。

这几篇文章就简单讲一下在实践中,我们是如何通过gRPC整合tensorflow serving调用深度学习模型的。

打算分为以下几个部分

  1. 首先必须要知道一些gRPC的基本知识,能用java实现一个客户端和服务器端的小demo 就是本篇gRPC java入门例子
  2. 然后是 tensorflow model导出到tensorflow serving的要点讲解
  3. 接着是一个完整的java通过gRPC调用tensorflow serving的例子。

下面开始本篇,首先关于gRPC的介绍就不多说了,网上随便一搜都是,什么高性能,以支持移动和HTTP2.0为主的开源RPC协议什么的。。。

我们直接开始java的小demo,我基于Intellij的iDE讲解,eclipse大体也差不多。

1. 首先第一步,新建一个基于gradle构建的java项目,如图
java整合tensorflow serving

java整合tensorflow serving

然后点下一步,填写maven坐标什么的,按照自己实际情况填写就好了,最后点finish,一个gradle项目就建好了。

2.接着加入相关的依赖和插件

因为我们需要gRPC的支持所以需要加入grpc几个相关的依赖,还有一个protobuf的gradle插件,用于将proto的协议文件生成java源码。

编辑build.gradle 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
group 'com.xyz'
version '1.0-SNAPSHOT'

repositories {
mavenCentral()
}

buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.3'
}
}

apply plugin: 'java'

apply plugin: 'com.google.protobuf'

sourceCompatibility = 1.8

protobuf {
generatedFilesBaseDir = "$projectDir/src/"
sourceSets {
main {
proto {
// 除了默认的'src/main/proto'目录新增proto文件的方法
srcDir 'src/main/protobuf'
include '**/*.protodevel'
}
}
}
protoc {
// The artifact spec for the Protobuf Compiler
artifact = 'com.google.protobuf:protoc:3.0.0'
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.0.0-pre2'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}

dependencies {
compile 'com.google.protobuf:protobuf-java:3.0.0'
compile 'io.grpc:grpc-stub:1.0.0-pre2'
compile 'io.grpc:grpc-protobuf:1.0.0-pre2'
compile "io.grpc:grpc-netty:1.0.0-pre2"
if (JavaVersion.current().isJava9Compatible()) {
compile 'javax.annotation:javax.annotation-api:1.3.1'
}
testCompile 'junit:junit:4.12'
}
3.接着 就是编写我们的服务协议定义的proto文件

在src/main/目录下新建一个proto目录,默认gradle-protobuf插件会自动处理这个目录下的proto文件,如果你想自己定义其他目录,就要像上面那个配置文件中那样,添加proto–srcDir的配置单元了,这里我起一个名字叫test_grpc.proto 内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Copyright 2015, gRPC Authors
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples";
option java_outer_classname = "TestGRPCProto";

package examples;

service TestService {
rpc ListPeople (Query) returns (People) {//无办法不传参数 只能定义一个空消息 传进去 ,我想这可能是也是一种防御性编程吧 毕竟后面可能会用到参数

}

}

message Query {

}

message People {
string name = 1;
int32 age = 2;
}

就是简单的定义了一个叫TestService的服务,然后只有一个方法叫ListPeople,方法接收一个空的参数,但是gRPC不允许这种完全空的,只能通过自己定义个空消息实现,ListPeople返回一个People的对象,都是写死的,做个演示。

保存后,执行gradle build,或者其他触发编译的命令,protoc就会将刚才的proto文件生成对应的java代码,拷贝这些代码到我们的src/main/java目录,下面就可以开始编写服务代码了。

4.然后我们编写服务端

新建一个叫TestServer.java的源文件,主要的步骤就是首先写一个服务类继承刚才gRPC自动生成的那堆代码中的服务stub类,就是XXXImplBase类,其中的XXX就是你proto中定义的服务名,覆写里面的服务方法后,然后再写一个启动server的代码就ok了,具体如下图,作为demo可以直接拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package io.grpc.examples;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;

public class TestServer {

private Server server;

private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new TestServiceImpl())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
TestServer.this.stop();
System.err.println("*** server shut down");
}
});
}

private void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/

private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/

public static void main(String[] args) throws IOException, InterruptedException {
final TestServer server = new TestServer();
server.start();
server.blockUntilShutdown();
}

static class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override
public void listPeople(Query request, StreamObserver responseObserver) {
People people = People.newBuilder().setAge(11).setName("xiaozhang").build();
responseObserver.onNext(people);
responseObserver.onCompleted();
}
}
}
5.最后就是client端的调用代码了

新建一个类TestClient.java 拷贝以下代码, 其中调用gRPC的服务client分为两种,一种是blocking的同步调用,一种是non-blocking的异步调用,代码的注释中有这块,可以参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package io.grpc.examples;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

public class TestClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1",50051).usePlaintext(true).build();
TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);
TestServiceGrpc.TestServiceStub ayncStub = TestServiceGrpc.newStub(channel);

//同步调用
System.out.println(blockingStub.listPeople(Query.newBuilder().build()));
//异步调用
ayncStub.listPeople(Query.newBuilder().build(), new StreamObserver(){
@Override
public void onNext(People value) {
System.out.println(value);
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {
System.out.println("finished");
}
});
System.out.println("异步调用不阻塞,所以这句先打印");
Thread.sleep(10);
}
}

最后启动server,再启动服务端就可以看到简单的效果了,时间仓促,有问题敬请留言。有了对java使用grpc的简单印象后,后面我们继续探讨整合tensorflow serving的方式。

深度学习资源整理,不断更新中

andy阅读(6083)

深度学习资源整理,不断更新中

基础知识:
线性代数
  1. 麻省理工公开课线性代数,  网易版http://open.163.com/special/opencourse/daishu.html
  2. 线性代数应用课程:Coding the Matrix: Linear Algebra through Computer Science Applications

    链接: https://pan.baidu.com/s/1o7AXQrC 密码: 84yi

  3. B站搬运工 线性代数的本质-合集  内容:01 – 向量究竟是什么  02 – 线性组合、张成的空间与基  03 – 矩阵与线性变换、04 – 矩阵乘法与线性变换复合、附注1 – 三维空间中的线性变换、05 – 行列式、06 – 逆矩阵、列空间与零空间、附注2 – 非方阵、07 – 点积与对偶性、08第一部分 – 叉积的标准介绍、08第二部分 – 以线性变换的眼光看叉积、09 – 基变换、10 – 特征向量与特征值、11 – 抽象向量空间   地址https://www.bilibili.com/video/av6731067/
微积分
  1. 俄亥俄州立大学微积分基础 coursera https://www.coursera.org/learn/calculus1
  2. 数列与级数  讲课老师说话非常有意思  确切的说表情非常用力  https://www.bilibili.com/video/av1354315
概率论

台大概率课程 链接: http://pan.baidu.com/s/1mis8w8C 密码: gqun

 

python编程基础
  1. Learn to Program: The Fundamentals

    链接: http://pan.baidu.com/s/1eSlZbR8 密码: fwr6

神经网络
  1. Andrew Ng (吴恩达)深度学习专项课程 coursera链接 网易云课堂字幕版链接
  2. 深度学习三巨头之一Geoffrey Hinton 大神的 面向机器学习的神经网络(Neural Networks for Machine Learning)
    https://www.coursera.org/learn/neural-networks?siteID=PYQagbz7Hd0-MeOm9SQ6gC2NixSjmgOkBw&utm_content=2&utm_medium=partners&utm_source=linkshare&utm_campaign=PYQagbz7Hd0
    这门比吴恩达的要难一些 但是没有作业 只有小测
  3. Udacity优达学城的深度学习免费课程 来自google 主要基于tensorflow的应用 https://cn.udacity.com/course/deep-learning–ud730

中文的

  1. 台大李宏毅老师深度学习课程
    课程主页:http://speech.ee.ntu.edu.tw/~tlkagk/courses_MLDS17.html
    课程视频Playlist: https://www.youtube.com/playlist?list=PLJV_el3uVTsPMxPbjeX7PicgWbY7F8wW9
    B站搬运深度学习课程视频: https://www.bilibili.com/video/av9770302/
  2. 台大美女老师陈缊侬老师深度学习应用课程:Applied Deep Learning / Machine Learning and Having It Deep and Structured

16年课程主页,有Slides等相关资料:https://www.csie.ntu.edu.tw/~yvchen/f105-adl/index.html
17年课程主页,资料正在陆续放出:https://www.csie.ntu.edu.tw/~yvchen/f106-adl/
Youtube视频,目前没有playlist,可以关注其官方号放出的视频:https://www.youtube.com/channel/UCyB2RBqKbxDPGCs1PokeUiA/videos

特别推荐

莫烦Python及机器学习教程  这个特别适合入门,内容深度虽然很浅,但是比较通俗易懂,也算包罗万象,而且小哥哥的声音非常甜美哈哈  传送门http://morvanzhou.github.io/