在智能制造和工业物联网(IIoT)环境中,传感器数据的完整性至关重要。恶意行为者,无论是内部还是外部,都可能通过注入伪造数据来操纵生产决策、引发设备故障或破坏产品质量。由于传统网络安全手段难以深入到数据有效载荷的语义层面,我们需要一种专注于数据本身的AI基础设施解决方案。本文将介绍如何使用轻量级的自编码器(Autoencoder, AE)模型,在边缘网关层面实时检测这种恶意数据注入。
1. 为什么选择自编码器?
自编码器是一种非监督学习模型,它被训练用来学习“正常”数据的低维表示(编码)并尝试从该表示中精确地重建原始输入(解码)。其核心思想是:
- 训练时,只使用干净的、正常的传感器数据。
- 遇到恶意注入的、结构异常的数据时,自编码器无法对其进行准确重建。
- 高重建误差(Reconstruction Error)即被视为异常或潜在的恶意注入。
这种方法的优势是无需标记恶意样本,只需要大量正常运行数据即可。
2. 训练轻量级自编码器(PyTorch)
我们以一个包含10个特征(例如:温度、压力、振动频率等)的传感器数据集为例,构建一个简单的线性自编码器。
环境准备
pip install torch numpy scikit-learn onnx onnxruntime
PyTorch 模型与训练代码
import torch
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
# 假设传感器输入维度为10
INPUT_DIM = 10
ENCODING_DIM = 3
# 1. 生成模拟正常数据 (1000个样本)
np.random.seed(42)
X_normal = np.random.rand(1000, INPUT_DIM) * 10
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X_normal)
X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
# 2. 定义自编码器架构
class SimpleAutoencoder(nn.Module):
def __init__(self, input_dim, encoding_dim):
super(SimpleAutoencoder, self).__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, 7),
nn.ReLU(True),
nn.Linear(7, encoding_dim)
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 7),
nn.ReLU(True),
nn.Linear(7, input_dim),
nn.Sigmoid() # 确保输出在0-1范围内,与输入缩放一致
)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x
# 3. 训练模型
model = SimpleAutoencoder(INPUT_DIM, ENCODING_DIM)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 50
for epoch in range(num_epochs):
outputs = model(X_tensor)
loss = criterion(outputs, X_tensor)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}')
3. 部署优化:导出到ONNX
为了在资源受限的边缘设备上实现低延迟推理,我们将训练好的PyTorch模型转换为行业标准的ONNX(Open Neural Network Exchange)格式。
import onnx
import torch.onnx
# 导出模型
dummy_input = torch.randn(1, INPUT_DIM, dtype=torch.float32)
torch.onnx.export(
model,
dummy_input,
"autoencoder.onnx",
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)
print("模型已成功导出到 autoencoder.onnx")
4. 边缘实时检测实现(ONNX Runtime)
在边缘网关或PLC(可编程逻辑控制器)的侧边计算单元上,我们使用ONNX Runtime来加载和执行模型,并设置异常阈值。
关键步骤:首先,我们需要计算正常数据下的平均重建误差和最大容忍误差(例如,取正常数据重建误差的第99百分位数)作为阈值 $T$。
import onnxruntime as ort
import numpy as np
# 假设阈值T已通过对正常数据的分析确定
# (例如,正常数据在训练后的重建误差的99%分位点)
ANOMALY_THRESHOLD = 0.005 # 实际值需根据训练结果调整
# 1. 加载ONNX模型
sess = ort.InferenceSession("autoencoder.onnx")
input_name = sess.get_inputs()[0].name
output_name = sess.get_outputs()[0].name
# 2. 定义检测函数
def detect_injection(raw_sensor_data, scaler_obj):
# 预处理:使用训练时相同的MinMaxScaler
scaled_data = scaler_obj.transform(raw_sensor_data.reshape(1, -1)).astype(np.float32)
# 推理
ort_inputs = {input_name: scaled_data}
ort_outputs = sess.run([output_name], ort_inputs)
reconstructed_data = ort_outputs[0]
# 计算重建误差 (MSE)
mse = np.mean(np.power(scaled_data - reconstructed_data, 2), axis=1)[0]
print(f"Reconstruction Error: {mse:.6f}")
if mse > ANOMALY_THRESHOLD:
return True, "ALERT: Potential Data Injection Detected!"
else:
return False, "Normal Data Stream."
# 3. 模拟数据流测试
# A. 正常数据测试 (接近训练数据)
normal_input = np.random.rand(INPUT_DIM) * 10
is_anomaly, message = detect_injection(normal_input, scaler)
print(f"Test Normal: {message}")
# B. 恶意注入测试 (数据值被篡改到极值,远超正常范围)
# 例如,将温度设置为-1000或压力设置为10000
injected_input = np.copy(normal_input)
injected_input[5] = 10000.0 # 恶意注入一个异常的极值
is_anomaly, message = detect_injection(injected_input, scaler)
print(f"Test Injection: {message}")
预期输出结果分析:
- Test Normal的重建误差应该很低,低于 $T$。
- Test Injection的重建误差会显著高于 $T$,因为自编码器从未见过这种统计分布的数据,导致无法准确还原,从而触发警报。
总结
通过将轻量级的自编码器模型部署为ONNX格式,并在工业数据采集点前置一个AI检测层,我们能够实时、高效地从语义层面验证传感器数据的完整性。这提供了一个强大的、基于AI的基础设施组件,用于增强智能制造系统的安全性和决策可靠性,有效阻止传感器数据注入攻击对生产流程造成损害。
汤不热吧