在现代推荐系统和实时决策系统中,特征工程必须从传统的批处理模式转向低延迟的实时服务。特征提取的延迟是影响整个模型服务(Serving)链路的关键瓶颈。本文将深入探讨如何利用 Ray 强大的分布式计算能力和Actor模型,构建一个能够实现毫秒级响应的实时特征提取(Feature Extraction)和特征查询(Feature Lookup)管道。
Contents
挑战与 Ray 的优势
传统的特征提取通常涉及复杂的聚合和对外部数据库(如 Redis、Cassandra)的查询。当面临每秒数千次的并发请求时,串行处理或使用线程池会很快达到瓶颈,难以维持低于 100ms 的延迟。
Ray 提供了两个关键优势来解决这个问题:
- 分布式 Actor 模型: Ray Actor 是有状态的工作进程。我们可以将耗时的特征存储查询封装在 Actor 中,使其能够维护连接池和缓存,避免重复初始化。
- 异步并行性: Ray Task 允许我们并行执行特征计算和外部查询,从而隐藏 I/O 延迟。
实践:构建一个低延迟的特征服务
我们将创建一个简化的实时特征服务,包括一个模拟的特征存储(Stateful Actor)和一个特征计算任务(Stateless Task)。
1. 环境准备
确保你安装了 Ray:
1 npm install ray
2. 定义特征存储(Ray Actor)
特征存储(Feature Store)负责查询历史聚合特征,它需要维护状态(如数据库连接或内部缓存)。我们将其定义为一个 Ray Actor。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 import ray
import time
import random
from typing import Dict, Any
# 初始化 Ray (如果尚未初始化)
try:
ray.init(ignore_reinit_error=True)
except RuntimeError:
pass
@ray.remote
class FeatureStore:
"""负责模拟外部特征数据库的Actor,维护连接和缓存。"""
def __init__(self):
print("FeatureStore Actor 初始化中...")
# 模拟初始化数据库连接池
self.cache = {
101: {"user_age": 35, "last_7d_clicks": 12.5},
102: {"user_age": 22, "last_7d_clicks": 5.1}
}
def get_historical_features(self, user_id: int) -> Dict[str, Any]:
"""模拟一次对历史特征的低延迟查询(例如从 Redis)。"""
# 模拟 5ms 的网络延迟和查询时间
time.sleep(0.005)
if user_id in self.cache:
return self.cache[user_id]
else:
# 默认值,确保服务健壮性
return {"user_age": 30, "last_7d_clicks": 0.0}
def update_live_features(self, user_id: int, live_data: float):
"""模拟特征回写。"""
# 实际生产中,这里会触发异步更新流
pass
3. 定义实时特征提取任务(Ray Task)
特征提取逻辑通常是计算密集型,但由于是针对单个请求,我们将其定义为 Ray 异步任务。它负责结合原始请求数据和历史数据,生成最终的特征向量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 @ray.remote
def extract_combined_features(raw_request_data: Dict[str, Any], historical_features: Dict[str, Any]) -> Dict[str, float]:
"""结合实时数据和历史数据,进行复杂的特征计算。"""
user_id = raw_request_data['user_id']
current_value = raw_request_data['current_value']
# 模拟特征工程计算耗时 (10ms - 20ms)
compute_time = random.uniform(0.010, 0.020)
time.sleep(compute_time)
# 核心特征逻辑
feature_vector = {
'f_normalized_value': current_value / 100.0,
'f_recency_score': 1.0 / (historical_features['last_7d_clicks'] + 1.0),
'f_age_value_ratio': historical_features['user_age'] * current_value
}
return feature_vector
4. 实时并发请求编排
Ray 的核心价值在于编排异步调用。对于一个传入的实时请求,我们必须并行启动特征查询和特征计算,以达到最低延迟。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 def serve_request_async(feature_store_actor, user_id, current_value):
raw_data = {'user_id': user_id, 'current_value': current_value}
# 步骤 1: 异步获取历史特征 (I/O 绑定)
# .remote 调用立即返回一个 ObjectRef
historical_ref = feature_store_actor.get_historical_features.remote(user_id)
# 步骤 2: 异步等待历史数据,并启动特征提取计算 (CPU/计算绑定)
# 这里我们模拟一个依赖链:先得到历史特征,再计算最终特征
final_feature_ref = extract_combined_features.remote(raw_data, ray.get(historical_ref))
return final_feature_ref
# --- 模拟并发请求 ---
fs = FeatureStore.remote()
# 模拟 10 个实时请求
requests = [
(101, random.randint(50, 200)),
(102, random.randint(50, 200)),
(101, random.randint(50, 200)),
(103, random.randint(50, 200)), # 新用户
(102, random.randint(50, 200))
]
start_time = time.time()
# 并行发起所有请求
result_refs = []
for user_id, value in requests:
ref = serve_request_async(fs, user_id, value)
result_refs.append(ref)
# 等待所有结果
final_features = ray.get(result_refs)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000 / len(requests)
print("\n--- 结果摘要 ---")
print(f"总共处理了 {len(requests)} 个请求")
print(f"平均请求延迟: {latency_ms:.2f} ms")
print("第一个特征向量示例:")
print(final_features[0])
# 关闭 Ray
ray.shutdown()
结论
通过使用 Ray Actor 封装有状态的特征查询服务,并利用 Ray Task 实现了特征计算的并行化,我们成功地将 I/O 延迟和计算延迟重叠(Overlap)。在我们的模拟中,尽管单个特征查询需要 5ms,特征计算需要 10-20ms,但由于 Ray 的高效调度,我们可以实现远低于串行执行的平均延迟,轻松满足实时系统对秒级甚至毫秒级响应的要求。这种架构是构建高性能 MLOps 实时推理管道的关键。
汤不热吧