如何通过加密签名与身份验证协议构建安全的多 Agent 协作系统
在现代 AI 基础设施中,多 Agent 系统(MAS)正成为解决复杂任务的主流架构。然而,当多个 Agent 在分布式环境中交互时,如何确保消息的来源真实且内容未被篡改?本文将从架构设计到代码实现,教你构建一个带安全防护的 Multi-Agent 系统。
1. 核心安全挑战
在无中心化的 Agent 协作中,主要面临以下威胁:
– 身份冒充:恶意 Agent 伪装成管理节点发送破坏性指令。
– 数据篡改:中间节点截获并修改协作任务的上下文或 Tool 调用参数。
– 重放攻击:攻击者截获合法的 Agent 消息并重复发送以触发多次操作。
2. 安全协议设计
我们将采用基于 RSA 的非对称加密签名机制。每个 Agent 在初始化时拥有一对公私钥。发送消息时,Agent 使用私钥签名;接收方或中心化消息总线(Message Bus)使用该 Agent 的公钥进行验证。
消息结构定义
使用 Pydantic 定义标准化的安全消息包格式:
from pydantic import BaseModel
from typing import Any, Dict
class SecureMessage(BaseModel):
sender_id: str
receiver_id: str
payload: Dict[str, Any]
signature: str
timestamp: float
3. 实操实现
3.1 环境准备
安装必要的依赖库:
pip install cryptography pydantic
3.2 核心安全模块代码
以下代码展示了如何生成 Agent 密钥对,并实现消息签名与校验逻辑:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import json
import time
class AgentSecurityProvider:
def __init__(self, agent_id: str):
self.agent_id = agent_id
# 生成 RSA 密钥对
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
self.public_key = self.private_key.public_key()
def get_public_key_pem(self):
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
def sign_payload(self, payload: dict) -> str:
# 序列化 payload 并确保 Key 排序一致
content = json.dumps(payload, sort_keys=True).encode('utf-8')
signature = self.private_key.sign(
content,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
return signature.hex()
@staticmethod
def verify(public_key_pem: str, payload: dict, signature_hex: str) -> bool:
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
content = json.dumps(payload, sort_keys=True).encode('utf-8')
try:
public_key.verify(
bytes.fromhex(signature_hex),
content,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
return True
except Exception:
return False
3.3 构建安全交互总线
总线负责注册 Agent 公钥并在转发消息前强制执行签名校验:
class SecureAgentBus:
def __init__(self):
self.registry = {} # 存储 agent_id -> public_key_pem
def register_agent(self, agent_id, pub_key):
self.registry[agent_id] = pub_key
print(f"Agent {agent_id} registered successfully.")
def process_message(self, msg: SecureMessage):
# 1. 检查时效性 (防重放)
if time.time() - msg.timestamp > 30: # 30秒过期
raise TimeoutError("Message expired")
# 2. 校验发送者是否存在
if msg.sender_id not in self.registry:
raise ValueError("Unauthorized Agent")
# 3. 签名验证
is_valid = AgentSecurityProvider.verify(
self.registry[msg.sender_id],
msg.payload,
msg.signature
)
if not is_valid:
raise SecurityError("Signature mismatch! Tampering suspected.")
print(f"[Auth Success] Routing message from {msg.sender_id} to {msg.receiver_id}")
return True
4. 总结
构建多 Agent 系统时,通信协议的安全性直接决定了系统的稳定性。通过 Pydantic 定义的严格 Schema 和基于 RSA 的签名校验,我们可以有效防止中间人攻击和身份欺诈。在更复杂的 AI Infra 中,建议将此逻辑集成到消息队列(如 Redis 或 Kafka)的拦截器中,以实现透明的安全层。
汤不热吧