作为AI基础设施的资深架构师,我们在构建高并发、低延迟的推理服务时,经常采用Spring WebFlux或原生Reactor来提升吞吐量。理解Reactive Stream的生命周期钩子(Hooks)对于精确控制副作用、高效日志记录和资源清理至关重要。
WebFlux中的Mono和Flux提供了丰富的doOn***操作符,用于在特定信号发生时插入副作用(Side Effects)。本文将深入解析其中最常用的四个钩子:doOnNext、doOnError、doOnTerminate和doFinally**的区别和执行时机。
Contents
1. Reactive Stream 生命周期基础
Reactive Stream的生命周期遵循以下路径:
- Subscription (订阅): 数据流开始。
- OnNext (数据): 零个、一个或多个数据项发出。
- Termination (终止): 以两种方式之一结束:
- OnComplete (完成): 成功终止。
- OnError (错误): 异常终止。
这些**doOn***钩子正是基于这些核心信号触发的。
2. 四大核心钩子的区别与时机
| 钩子名称 | 触发时机 | 是否处理终止信号? | 主要用途 | 关键区别 |
|---|---|---|---|---|
| doOnNext | 每次成功发出元素后。 | 否 | 数据转换、记录元素内容。 | 仅在有数据发出时多次执行。 |
| doOnError | 接收到 onError 信号时。 | 是(错误终止) | 错误日志记录、异常指标上报。 | 仅在发生错误时执行。 |
| doOnTerminate | 接收到 onComplete 或 onError 信号时。 | 是(成功/错误) | 记录流的终止状态。 | 不处理 onCancel 信号。 |
| doFinally | 流终止后(无论成功、失败或被取消)。 | 是(成功/错误/取消) | 资源释放、清理数据库连接、事务回滚。 | 一定会执行,是资源清理的首选。 |
2.1. doOnNext (数据到达)
这是最简单且最常用于日志记录的钩子。它在数据项成功发出并通过操作符链时执行。注意:如果流是空的(即Mono.empty()),它不会执行。
2.2. doOnError (错误处理)
当上游发出 onError 信号时立即触发。这个钩子主要用于记录错误信息或触发额外的错误处理逻辑,但它不会消费或改变错误本身,错误信号仍会向下游传播。
2.3. doOnTerminate (流终止)
doOnTerminate 是在流完成(onComplete)或失败(onError)时执行的。它提供了一个单一的、在流生命周期末尾执行副作用的机会。然而,它的一个重要局限是它不会在流被取消(onCancel)时执行。
2.4. doFinally (最终清理)
doFinally 是所有钩子中最强大的,因为它保证在任何类型的终止信号(onComplete、onError 或 onCancel)之后执行。它接收一个 SignalType 枚举作为参数,允许你判断流是如何终止的。
重点: 对于资源的最终释放(例如,关闭文件句柄或数据库连接),永远应该使用 doFinally,而不是 doOnTerminate。
3. 实战示例:执行顺序演示
我们使用Java Reactor代码来直观地展示在成功和失败场景下的执行顺序。
3.1. 环境依赖
1
2
3
4
5 <!-- 引入 Reactor Core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
3.2. 代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
public class ReactorHooksDemo {
private static void log(String message) {
System.out.println("[" + Thread.currentThread().getName() + "] " + message);
}
private static Flux<Integer> createTestFlux() {
return Flux.just(101, 202)
.doOnNext(n -> log("Hook: 1. doOnNext - 处理数据: " + n))
.doOnError(e -> log("Hook: 2. doOnError - 收到错误: " + e.getMessage()))
.doOnTerminate(() -> log("Hook: 3. doOnTerminate - 流结束(成功或失败)"))
.doFinally(signal -> log("Hook: 4. doFinally - 最终信号: " + signal));
}
public static void runSuccessScenario() {
log("\n--- 场景一: 成功完成 (onComplete) ---");
createTestFlux()
.subscribe(
n -> log("Subscriber: 接收数据: " + n),
err -> log("Subscriber: 收到错误: " + err.getMessage()),
() -> log("Subscriber: 完成!")
);
}
public static void runFailureScenario() {
log("\n--- 场景二: 发生错误 (onError) ---");
Flux.error(new RuntimeException("Simulated Inference Timeout"))
.doOnNext(n -> log("Hook: 1. doOnNext - (不执行)"))
.doOnError(e -> log("Hook: 2. doOnError - 收到错误: " + e.getMessage()))
.doOnTerminate(() -> log("Hook: 3. doOnTerminate - 流结束(成功或失败)"))
.doFinally(signal -> log("Hook: 4. doFinally - 最终信号: " + signal))
.subscribe(
n -> log("Subscriber: 接收数据: " + n),
err -> log("Subscriber: 收到错误: " + err.getMessage()),
() -> log("Subscriber: 完成!")
);
}
public static void runCancellationScenario() {
log("\n--- 场景三: 取消 (onCancel) ---");
Flux.just(1, 2, 3)
.delayElements(java.time.Duration.ofMillis(100))
.doOnCancel(() -> log("Hook: 0. doOnCancel - 触发取消"))
.doOnTerminate(() -> log("Hook: 3. doOnTerminate - (被跳过)"))
.doFinally(signal -> log("Hook: 4. doFinally - 最终信号: " + signal))
.subscribe(new org.reactivestreams.Subscriber<Integer>() {
// 订阅者实现,立即请求并取消
org.reactivestreams.Subscription s;
public void onSubscribe(org.reactivestreams.Subscription s) {
this.s = s;
s.request(1);
s.cancel();
}
public void onNext(Integer t) { log("Subscriber: Received: " + t); }
public void onError(Throwable t) { log("Subscriber: Error"); }
public void onComplete() { log("Subscriber: Complete"); }
});
// 实际运行需要等待异步操作完成,这里省略Thread.sleep以保持示例简洁性
}
public static void main(String[] args) {
runSuccessScenario();
runFailureScenario();
runCancellationScenario();
}
}
3.3. 执行输出分析(部分)
场景一:成功完成
1
2
3
4
5
6
7 [main] Hook: 1. doOnNext - 处理数据: 101
[main] Subscriber: 接收数据: 101
[main] Hook: 1. doOnNext - 处理数据: 202
[main] Subscriber: 接收数据: 202
[main] Subscriber: 完成!
[main] Hook: 3. doOnTerminate - 流结束(成功或失败) <-- 成功后触发
[main] Hook: 4. doFinally - 最终信号: ON_COMPLETE <-- 保证执行
场景二:发生错误
1
2
3
4 [main] Hook: 2. doOnError - 收到错误: Simulated Inference Timeout <-- 错误发生时立即触发
[main] Subscriber: 收到错误: Simulated Inference Timeout
[main] Hook: 3. doOnTerminate - 流结束(成功或失败) <-- 错误后触发
[main] Hook: 4. doFinally - 最终信号: ON_ERROR <-- 保证执行
场景三:取消
1
2
3 [main] Hook: 0. doOnCancel - 触发取消
[main] Hook: 3. doOnTerminate - (被跳过) <-- doOnTerminate未执行
[main] Hook: 4. doFinally - 最终信号: CANCEL <-- doFinally因取消信号执行
4. 总结:最佳实践选择
- 记录数据流: 使用 doOnNext。
- 上报指标或处理错误日志: 使用 doOnError。
- 最终的、无条件的资源清理(AI模型资源释放、连接关闭): 必须使用 doFinally,并检查 SignalType.CANCEL 以确保在客户端提前断开连接时也能释放资源。
- 简单的流程终止日志: 如果确定不会发生取消场景,或不关心取消,可使用 doOnTerminate。
汤不热吧