在现代AI模型部署架构中,一个完整的推理链路往往涉及多个微服务,例如数据预处理服务、特征工程服务、以及最终的模型推理服务。传统上,服务间通信依赖于RESTful API (基于HTTP 1.1),但这种方式在大数据量、高频率的推理请求场景下,会引入不必要的延迟和序列化/反序列化开销。
gRPC 是一个高性能、开源的通用RPC框架,它基于HTTP/2协议,并使用Protocol Buffers (Protobuf) 作为接口定义语言和数据序列化格式。gRPC在AI基础设施中具有天然优势:
- 更低的延迟: 基于HTTP/2,支持多路复用,减少了连接建立的开销。
- 紧凑的数据格式: Protobuf是二进制格式,比JSON或XML更小、更快。
- 强类型接口: Protobuf定义了严格的服务契约,便于维护和跨语言集成。
本文将通过一个实操示例,展示如何定义Protobuf结构,并实现一个高性能的Python gRPC推理服务。
Contents
1. 定义Protocol Buffers (Protobuf)
模型推理的关键在于高效传输张量(Tensor)数据。我们使用 bytes 类型来传输原始的NumPy数组,并辅以必要的元数据(如维度和数据类型)。
创建 inference.proto 文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 syntax = "proto3";
package inference;
// 定义张量数据结构,用于请求和响应中的输入/输出
message TensorData {
// 张量的原始数据,通常是序列化的NumPy数组字节
bytes data = 1;
// 张量的维度 (e.g., [1, 224, 224, 3])
repeated int32 shape = 2;
// 数据类型 (e.g., float32, int8)
string dtype = 3;
}
// 定义推理请求
message InferenceRequest {
string model_name = 1;
repeated TensorData inputs = 2;
}
// 定义推理响应
message InferenceResponse {
string status = 1;
repeated TensorData outputs = 2;
int64 latency_ms = 3;
}
// 定义推理服务
service InferenceService {
// 核心的Unary RPC
rpc Predict (InferenceRequest) returns (InferenceResponse);
}
2. 生成 gRPC Python 存根 (Stubs)
安装 gRPC 工具:
1
2 pip install grpcio grpcio-tools numpy
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. inference.proto
这将生成 inference_pb2.py (数据结构) 和 inference_pb2_grpc.py (服务接口)。
3. 实现 gRPC 服务端
服务端需要负责接收字节数据,将其反序列化为NumPy数组,执行推理,然后将结果重新序列化。
创建 inference_server.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 import grpc
import time
import numpy as np
from concurrent import futures
import inference_pb2
import inference_pb2_grpc
# 辅助函数:将 NumPy 数组转换为 Protobuf TensorData
def numpy_to_tensordata(np_array):
return inference_pb2.TensorData(
data=np_array.tobytes(),
shape=np_array.shape,
dtype=str(np_array.dtype)
)
# 辅助函数:将 Protobuf TensorData 转换为 NumPy 数组
def tensordata_to_numpy(tensor_data):
# 注意:需要保证dtype正确,否则数据可能损坏
dtype = np.dtype(tensor_data.dtype)
return np.frombuffer(tensor_data.data, dtype=dtype).reshape(tensor_data.shape)
class InferenceServicer(inference_pb2_grpc.InferenceServiceServicer):
def Predict(self, request, context):
start_time = time.time()
print(f"[Server] Received request for model: {request.model_name}")
# 1. 反序列化输入数据
input_tensors = []
for input_data in request.inputs:
np_arr = tensordata_to_numpy(input_data)
input_tensors.append(np_arr)
print(f" -> Input shape: {np_arr.shape}, dtype: {np_arr.dtype}")
# 2. 模拟模型推理 (实际生产中这里是调用TF/PyTorch/ONNX Runtime)
# 假设我们进行一个简单的加法操作,并返回结果
output_np = input_tensors[0] * 2 + 10
# 3. 序列化输出数据
output_tensordata = numpy_to_tensordata(output_np)
# 4. 构建响应
end_time = time.time()
latency_ms = int((end_time - start_time) * 1000)
response = inference_pb2.InferenceResponse(
status="OK",
outputs=[output_tensordata],
latency_ms=latency_ms
)
return response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
inference_pb2_grpc.add_InferenceServiceServicer_to_server(
InferenceServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC Server listening on port 50051...")
server.wait_for_termination()
if __name__ == '__main__':
serve()
4. 实现 gRPC 客户端
客户端通常是上游的数据处理服务。它负责准备数据并以 Protobuf 格式发送请求。
创建 inference_client.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 import grpc
import numpy as np
import time
import inference_pb2
import inference_pb2_grpc
# 辅助函数:将 NumPy 数组转换为 Protobuf TensorData (与服务端一致)
def numpy_to_tensordata(np_array):
return inference_pb2.TensorData(
data=np_array.tobytes(),
shape=np_array.shape,
dtype=str(np_array.dtype)
)
# 辅助函数:将 Protobuf TensorData 转换为 NumPy 数组 (与服务端一致)
def tensordata_to_numpy(tensor_data):
dtype = np.dtype(tensor_data.dtype)
return np.frombuffer(tensor_data.data, dtype=dtype).reshape(tensor_data.shape)
def run_inference():
# 1. 建立通道
with grpc.insecure_channel('localhost:50051') as channel:
stub = inference_pb2_grpc.InferenceServiceStub(channel)
# 2. 准备数据:一个模拟的图像输入 [1, 224, 224, 3], float32
dummy_input = np.random.rand(1, 224, 224, 3).astype(np.float32)
input_tensor_data = numpy_to_tensordata(dummy_input)
# 3. 构建请求
request = inference_pb2.InferenceRequest(
model_name="image_classifier_v1",
inputs=[input_tensor_data]
)
# 4. 发送请求并计时
print("[Client] Sending large tensor request...")
start_total = time.time()
try:
response = stub.Predict(request)
# 5. 处理响应
end_total = time.time()
output_np = tensordata_to_numpy(response.outputs[0])
print(f"\n[Client] Response Status: {response.status}")
print(f"[Client] Server Latency: {response.latency_ms} ms")
print(f"[Client] Total Client-Server-Roundtrip Time: {(end_total - start_total) * 1000:.2f} ms")
print(f"[Client] Output Shape: {output_np.shape}")
except grpc.RpcError as e:
print(f"[Client] RPC Failed: {e.details()}")
if __name__ == '__main__':
run_inference()
5. 性能优势总结
通过使用 gRPC 和 Protobuf,我们实现了高效的二进制数据传输。在传输一个典型的 1MB 图像张量数据时,gRPC的二进制序列化和HTTP/2的头部压缩能够将通信开销降到最低,相比于使用JSON或Multipart/Form-Data的RESTful请求,通常可以实现 30%到50% 的延迟降低,尤其是在服务内部(Internal Service Mesh)高频调用的场景中,性能提升最为显著。
汤不热吧