在生产环境中,GPU 资源往往十分昂贵。当你部署多个轻量级模型(如分类器、Embedding 模型、检测头)时,如果每个模型独占一张 GPU,资源浪费会非常严重。CUDA Stream 提供了一种在同一张 GPU 上并发执行多个推理任务的机制,通过计算与数据传输的重叠(Overlap),可以显著提升 GPU 利用率。本文将从原理出发,手把手教你用 PyTorch 实现多模型并发推理调度。

CUDA Stream 基础:为什么默认行为是串行的?
PyTorch 中绝大多数操作都运行在默认的 CUDA Stream(Stream 0)上。同一个 Stream 内的 Kernel 严格按提交顺序执行,即使前一个 Kernel 很小、后面的 Kernel 可以独立运行,GPU 也只能排队。这就是为什么你用多线程分别调用不同模型的 forward() 时,总耗时几乎是各模型耗时之和——因为它们共享同一个 Stream。
import torch
# 默认行为:两个模型串行执行
def serial_inference(model_a, model_b, input_a, input_b):
out_a = model_a(input_a) # Stream 0
out_b = model_b(input_b) # 等 out_a 的 Kernel 执行完才开始
return out_a, out_b
要打破这种串行约束,需要将不同模型的计算放入不同的 CUDA Stream 中。CUDA 运行时会在硬件层面尝试同时调度来自不同 Stream 的 Kernel,前提是 GPU 上有空闲的 SM(Streaming Multiprocessor)资源。
创建并管理多个 CUDA Stream
PyTorch 通过 torch.cuda.Stream 创建非默认 Stream。关键点是使用 with torch.cuda.stream(stream_obj) 上下文管理器,将后续操作绑定到指定 Stream:
import torch
# 创建两个独立的 CUDA Stream
stream_a = torch.cuda.Stream()
stream_b = torch.cuda.Stream()
def concurrent_inference(model_a, model_b, input_a, input_b):
# 在不同 Stream 上异步提交推理任务
with torch.cuda.stream(stream_a):
out_a = model_a(input_a)
with torch.cuda.stream(stream_b):
out_b = model_b(input_b)
# 必须同步:等待所有 Stream 完成
torch.cuda.synchronize()
return out_a, out_b
注意事项:torch.cuda.synchronize() 会阻塞 CPU 直到所有 Stream 的所有 Kernel 执行完毕。如果你只需要等待某个特定 Stream,可以用 stream_a.synchronize()。但要安全地读取结果,确保至少目标 Stream 已同步。

进阶技巧:计算与 Host-to-Device 传输的重叠
在真实场景中,推理不仅包含计算,还涉及数据从 CPU 内存拷贝到 GPU 显存的过程(H2D 传输)。通过将 H2D 传输放在一个 Stream、计算放在另一个 Stream,可以实现传输与计算的流水线重叠:
import torch
stream_compute = torch.cuda.Stream()
stream_transfer = torch.cuda.Stream()
def pipelined_batch_inference(model, data_batches):
results = []
# 预加载第一个 batch
with torch.cuda.stream(stream_transfer):
current_input = data_batches[0].cuda(non_blocking=True)
for i in range(len(data_batches)):
# 等待当前 batch 的数据传输完成
stream_compute.wait_stream(stream_transfer)
# 在 compute stream 上执行推理
with torch.cuda.stream(stream_compute):
result = model(current_input)
results.append(result)
# 预加载下一个 batch(与当前计算重叠)
if i + 1 < len(data_batches):
with torch.cuda.stream(stream_transfer):
current_input = data_batches[i + 1].cuda(non_blocking=True)
torch.cuda.synchronize()
return results
这里 non_blocking=True 参数至关重要——它让 .cuda() 调用在传输完成后立即返回 CPU,而不是阻塞等待。配合 wait_stream 的显式依赖管理,GPU 可以在处理当前 batch 计算的同时,从 CPU 拷贝下一个 batch 的数据。
生产环境中的 Stream 调度器设计
当模型数量较多时(例如 5-10 个小模型),手动管理 Stream 会变得混乱。一个更工程化的做法是封装一个 Stream 调度器:
import torch
from collections import OrderedDict
class GPUStreamScheduler:
def __init__(self, num_streams=4):
self.streams = {
f"stream_{i}": torch.cuda.Stream()
for i in range(num_streams)
}
self.model_to_stream = {}
def assign_model(self, model_name: str, stream_key: str):
"""将模型绑定到指定 Stream"""
assert stream_key in self.streams
self.model_to_stream[model_name] = stream_key
def run_inference(self, models: dict, inputs: dict):
"""并发执行多个模型的推理"""
results = {}
for name, model in models.items():
stream = self.streams[self.model_to_stream[name]]
with torch.cuda.stream(stream):
results[name] = model(inputs[name])
torch.cuda.synchronize()
return results
# 使用示例
scheduler = GPUStreamScheduler(num_streams=2)
scheduler.assign_model("classifier", "stream_0")
scheduler.assign_model("embedding", "stream_1")
results = scheduler.run_inference(
models={"classifier": cls_model, "embedding": emb_model},
inputs={"classifier": img_tensor, "embedding": text_tensor}
)
实际部署时,你还需要考虑 Stream 数量不宜超过 16 个(硬件并发能力有限),以及不同模型的 Kernel 大小差异——如果某个模型的 Kernel 占满了所有 SM,其他 Stream 的 Kernel 依然会排队。
性能验证与常见陷阱
验证并发是否生效最直接的方法是使用 nsys(NVIDIA Nsight Systems)进行 Profiling。以下命令可以捕获 Stream 级别的 Kernel 时间线:
nsys profile --trace=cuda python your_inference_script.py
在生成的 .nsys-rep 文件中,如果两个 Stream 的 Kernel 时间线有重叠,说明并发调度生效。常见陷阱包括:
1. 隐式同步操作:torch.tensor.item()、.cpu()、.numpy() 等操作会触发隐式的 cudaStreamSynchronize,打断并发流水线。务必在所有 Stream 任务提交完毕后再做数据搬运。
2. 显存碎片化:多 Stream 并发推理时,PyTorch 的 Caching Allocator 可能会为每个 Stream 维护独立的缓存块,导致显存占用升高。可通过设置 PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True 改善。
3. GIL 限制:Python 的 GIL 会限制多线程模型并发调用 forward()。但上述方案中,我们是在同一个线程内顺序提交异步 Kernel,因此不受 GIL 影响。如果需要真正的多线程调度,考虑使用 torch.multiprocessing 或释放 GIL 的 C++ 扩展。
总结
通过 CUDA Stream 实现多模型并发推理,核心要点如下:
• 同一 Stream 内的 Kernel 严格串行,不同 Stream 间的 Kernel 可以并发执行(前提是 GPU SM 资源有空余)
• 使用 non_blocking=True 和 wait_stream 实现数据传输与计算的流水线重叠
• 生产环境中建议封装 Stream 调度器,统一管理模型与 Stream 的映射关系
• 用 nsys Profiling 验证并发效果,避免隐式同步操作破坏流水线
汤不热吧