作为AI基础设施的核心组件,NVIDIA Triton Inference Server(TIS)是解决高并发、低延迟模型部署挑战的利器。要充分发挥现代GPU的性能潜力,仅仅部署模型是不够的,我们必须精确控制模型的并发度(Concurrency)和动态批处理(Dynamic Batching)。
本文将深入探讨如何在Triton中配置这两个关键特性,并提供一个可运行的示例。
Contents
一、 理解并发与动态批处理
1. 并发(Concurrency):Instance Groups
并发性通过Triton的实例组(Instance Groups)机制实现。它允许我们在单个GPU上同时加载模型的多个独立实例。这对于计算密集型模型(如大型LLM或CV模型)至关重要,因为它可以确保当一个请求正在等待I/O操作时,GPU能够立即处理另一个实例的计算任务,从而消除等待和提升利用率。
2. 动态批处理(Dynamic Batching)
动态批处理解决了客户端请求批次大小不一致或请求频率不稳定的问题。当客户端发送单个(Batch Size = 1)请求时,Triton会等待极短的时间(由配置决定),将多个小请求聚合(Batching)成一个大请求再提交给模型推理,这样可以显著减少核函数启动开销,提升整体吞吐量。
二、 模型仓库结构准备
假设我们有一个名为 model_a 的ONNX模型,我们需要按照Triton的标准结构组织文件:
******bash
model_repository/
├── model_a/
│ ├── 1/ # 模型版本号
│ │ └── model.onnx
│ └── config.pbtxt # 核心配置文件
三、 配置核心文件:config.pbtxt
下面的配置示例展示了如何同时开启多实例并发和动态批处理功能。
1. 配置 Instance Groups (并发)
我们设置 count: 2,这意味着Triton将在编号为0的GPU上创建 model_a 的两个并发实例。
2. 配置 Dynamic Batching
我们设置 max_batch_size: 16,允许的最大批处理大小为16。preferred_batch_size 告诉Triton优先尝试聚合到批次大小为4或8。max_queue_delay_microseconds (10000微秒 = 10毫秒) 是Triton等待请求聚合的最大时间。如果在这个时间内未达到首选批次大小,请求也将被立即处理。
******protobuf
name: “model_a”
platform: “onnxruntime_onnx”
max_batch_size: 16
配置 1:Instance Groups (多实例并发)
instance_group [
{
# 在 GPU 0 上运行 2 个并发实例
count: 2
kind: KIND_GPU
gpus: [0]
}
]
配置 2:Dynamic Batching (动态批处理)
dynamic_batching {
# 优先聚合的批次大小
preferred_batch_size: [4, 8]
# 最大等待时间:10毫秒 (用于聚合请求)
max_queue_delay_microseconds: 10000
}
假设模型接收一个 1024 维的浮点向量
input [
{
name: “input_data”
data_type: TYPE_FP32
dims: [ 1024 ]
}
]
output [
{
name: “output_data”
data_type: TYPE_FP32
dims: [ 1024 ]
}
]
四、 部署Triton Server
使用Docker启动Triton服务器,并挂载模型仓库:
******bash
docker run –gpus all -it –rm \
-p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v /path/to/model_repository:/models \
nvcr.io/nvidia/tritonserver:24.05-py3 \
tritonserver –model-repository=/models
五、 客户端实操:验证动态批处理
我们需要一个Python客户端来发送多个并发的小请求,以验证Triton是否成功地将它们聚合为更大的批次。
首先,安装必要的库:
******bash
pip install nvidia-pyindex tritonclient[grpc]
接下来,使用Python的 asyncio 或多线程发送多个 Batch Size = 1 的请求。
******python
import tritonclient.grpc as grpcclient
import numpy as np
import concurrent.futures
import time
MODEL_NAME = “model_a”
TRITON_URL = “localhost:8001”
INPUT_SHAPE = (1024,)
NUM_REQUESTS = 10 # 发送10个 Batch=1 的请求
def generate_input():
# 生成 Batch Size = 1 的输入数据
data = np.random.rand(1, *INPUT_SHAPE).astype(np.float32)
return data
def send_request(client, request_id):
inputs = []
input_data = generate_input()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 infer_input = grpcclient.InferInput("input_data", input_data.shape, "FP32")
infer_input.set_data_from_numpy(input_data)
inputs.append(infer_input)
start_time = time.time()
# 发送异步请求
result = client.async_infer(
model_name=MODEL_NAME,
inputs=inputs,
client_timeout=5.0
)
# 等待结果
result.get_result()
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
print(f"Request {request_id}: Latency {latency_ms:.2f} ms")
return latency_ms
if name == ‘main‘:
print(f”Connecting to Triton Server at {TRITON_URL}…”)
try:
# 使用连接池发送请求
client = grpcclient.InferenceServerClient(url=TRITON_URL)
1
2
3
4
5
6
7
8
9
10
11
12
13 with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_REQUESTS) as executor:
futures = []
for i in range(NUM_REQUESTS):
# 并发提交请求
futures.append(executor.submit(send_request, client, i))
concurrent.futures.wait(futures)
# 观察 Triton Server 的日志:
# 如果动态批处理生效,Triton日志中会显示多个请求被聚合为一个大的批次执行。
except Exception as e:
print(f"Error connecting to Triton or during inference: {e}")
关键观察点:
当运行上述客户端代码并观察Triton的控制台日志时,你会发现多个客户端请求(Batch Size=1)在进入模型之前,被Triton聚合(例如,聚合为 Batch Size=4 或 8)并一次性执行。这证明了动态批处理机制正在有效地工作,从而提高了整体吞吐量和GPU的计算效率。
通过合理设置 instance_group 和 dynamic_batching 参数,您可以针对特定的模型和流量模式进行优化,确保AI服务的部署能够充分利用底层硬件资源。
汤不热吧