欢迎光临
我们一直在努力

实战详解:如何利用 CUDA Stream 实现多模型推理的并发调度以最大化 GPU 利用率

在生产环境中,GPU 资源往往十分昂贵。当你部署多个轻量级模型(如分类器、Embedding 模型、检测头)时,如果每个模型独占一张 GPU,资源浪费会非常严重。CUDA Stream 提供了一种在同一张 GPU 上并发执行多个推理任务的机制,通过计算与数据传输的重叠(Overlap),可以显著提升 GPU 利用率。本文将从原理出发,手把手教你用 PyTorch 实现多模型并发推理调度。

GPU硬件

CUDA Stream 基础:为什么默认行为是串行的?

PyTorch 中绝大多数操作都运行在默认的 CUDA Stream(Stream 0)上。同一个 Stream 内的 Kernel 严格按提交顺序执行,即使前一个 Kernel 很小、后面的 Kernel 可以独立运行,GPU 也只能排队。这就是为什么你用多线程分别调用不同模型的 forward() 时,总耗时几乎是各模型耗时之和——因为它们共享同一个 Stream。

import torch

# 默认行为:两个模型串行执行
def serial_inference(model_a, model_b, input_a, input_b):
    out_a = model_a(input_a)  # Stream 0
    out_b = model_b(input_b)  # 等 out_a 的 Kernel 执行完才开始
    return out_a, out_b

要打破这种串行约束,需要将不同模型的计算放入不同的 CUDA Stream 中。CUDA 运行时会在硬件层面尝试同时调度来自不同 Stream 的 Kernel,前提是 GPU 上有空闲的 SM(Streaming Multiprocessor)资源。

创建并管理多个 CUDA Stream

PyTorch 通过 torch.cuda.Stream 创建非默认 Stream。关键点是使用 with torch.cuda.stream(stream_obj) 上下文管理器,将后续操作绑定到指定 Stream:

import torch

# 创建两个独立的 CUDA Stream
stream_a = torch.cuda.Stream()
stream_b = torch.cuda.Stream()

def concurrent_inference(model_a, model_b, input_a, input_b):
    # 在不同 Stream 上异步提交推理任务
    with torch.cuda.stream(stream_a):
        out_a = model_a(input_a)

    with torch.cuda.stream(stream_b):
        out_b = model_b(input_b)

    # 必须同步:等待所有 Stream 完成
    torch.cuda.synchronize()
    return out_a, out_b

注意事项torch.cuda.synchronize() 会阻塞 CPU 直到所有 Stream 的所有 Kernel 执行完毕。如果你只需要等待某个特定 Stream,可以用 stream_a.synchronize()。但要安全地读取结果,确保至少目标 Stream 已同步。

技术架构图

进阶技巧:计算与 Host-to-Device 传输的重叠

在真实场景中,推理不仅包含计算,还涉及数据从 CPU 内存拷贝到 GPU 显存的过程(H2D 传输)。通过将 H2D 传输放在一个 Stream、计算放在另一个 Stream,可以实现传输与计算的流水线重叠:

import torch

stream_compute = torch.cuda.Stream()
stream_transfer = torch.cuda.Stream()

def pipelined_batch_inference(model, data_batches):
    results = []

    # 预加载第一个 batch
    with torch.cuda.stream(stream_transfer):
        current_input = data_batches[0].cuda(non_blocking=True)

    for i in range(len(data_batches)):
        # 等待当前 batch 的数据传输完成
        stream_compute.wait_stream(stream_transfer)

        # 在 compute stream 上执行推理
        with torch.cuda.stream(stream_compute):
            result = model(current_input)
            results.append(result)

        # 预加载下一个 batch(与当前计算重叠)
        if i + 1 < len(data_batches):
            with torch.cuda.stream(stream_transfer):
                current_input = data_batches[i + 1].cuda(non_blocking=True)

    torch.cuda.synchronize()
    return results

这里 non_blocking=True 参数至关重要——它让 .cuda() 调用在传输完成后立即返回 CPU,而不是阻塞等待。配合 wait_stream 的显式依赖管理,GPU 可以在处理当前 batch 计算的同时,从 CPU 拷贝下一个 batch 的数据。

生产环境中的 Stream 调度器设计

当模型数量较多时(例如 5-10 个小模型),手动管理 Stream 会变得混乱。一个更工程化的做法是封装一个 Stream 调度器:

import torch
from collections import OrderedDict

class GPUStreamScheduler:
    def __init__(self, num_streams=4):
        self.streams = {
            f"stream_{i}": torch.cuda.Stream()
            for i in range(num_streams)
        }
        self.model_to_stream = {}

    def assign_model(self, model_name: str, stream_key: str):
        """将模型绑定到指定 Stream"""
        assert stream_key in self.streams
        self.model_to_stream[model_name] = stream_key

    def run_inference(self, models: dict, inputs: dict):
        """并发执行多个模型的推理"""
        results = {}
        for name, model in models.items():
            stream = self.streams[self.model_to_stream[name]]
            with torch.cuda.stream(stream):
                results[name] = model(inputs[name])

        torch.cuda.synchronize()
        return results

# 使用示例
scheduler = GPUStreamScheduler(num_streams=2)
scheduler.assign_model("classifier", "stream_0")
scheduler.assign_model("embedding", "stream_1")

results = scheduler.run_inference(
    models={"classifier": cls_model, "embedding": emb_model},
    inputs={"classifier": img_tensor, "embedding": text_tensor}
)

实际部署时,你还需要考虑 Stream 数量不宜超过 16 个(硬件并发能力有限),以及不同模型的 Kernel 大小差异——如果某个模型的 Kernel 占满了所有 SM,其他 Stream 的 Kernel 依然会排队。

性能验证与常见陷阱

验证并发是否生效最直接的方法是使用 nsys(NVIDIA Nsight Systems)进行 Profiling。以下命令可以捕获 Stream 级别的 Kernel 时间线:

nsys profile --trace=cuda python your_inference_script.py

在生成的 .nsys-rep 文件中,如果两个 Stream 的 Kernel 时间线有重叠,说明并发调度生效。常见陷阱包括:

1. 隐式同步操作torch.tensor.item().cpu().numpy() 等操作会触发隐式的 cudaStreamSynchronize,打断并发流水线。务必在所有 Stream 任务提交完毕后再做数据搬运。

2. 显存碎片化:多 Stream 并发推理时,PyTorch 的 Caching Allocator 可能会为每个 Stream 维护独立的缓存块,导致显存占用升高。可通过设置 PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True 改善。

3. GIL 限制:Python 的 GIL 会限制多线程模型并发调用 forward()。但上述方案中,我们是在同一个线程内顺序提交异步 Kernel,因此不受 GIL 影响。如果需要真正的多线程调度,考虑使用 torch.multiprocessing 或释放 GIL 的 C++ 扩展。

总结

通过 CUDA Stream 实现多模型并发推理,核心要点如下:

• 同一 Stream 内的 Kernel 严格串行,不同 Stream 间的 Kernel 可以并发执行(前提是 GPU SM 资源有空余)

• 使用 non_blocking=Truewait_stream 实现数据传输与计算的流水线重叠

• 生产环境中建议封装 Stream 调度器,统一管理模型与 Stream 的映射关系

• 用 nsys Profiling 验证并发效果,避免隐式同步操作破坏流水线

【本站文章皆为原创,未经允许不得转载】:汤不热吧 » 实战详解:如何利用 CUDA Stream 实现多模型推理的并发调度以最大化 GPU 利用率
分享到: 更多 (0)