在构建大规模向量检索系统时,实时数据流入是一个常见挑战。Faiss(Facebook AI Similarity Search)以其高性能著称,但其核心索引结构(如IndexIVF、IndexHNSW)通常是为静态数据集设计的。对一个数十亿级别的索引进行全量重刷以纳入少量新数据,其时间和资源开销是巨大的,这直接影响了系统的实时性和可用性。
本文将深入探讨如何采用“双层索引”(Dual-Index)策略,有效地处理实时写入,实现无需全量重刷索引的增量训练和查询。
1. Faiss增量更新的挑战
传统的Faiss索引,尤其是基于倒排文件(IVF)的结构,需要先进行一次耗时的train()操作来确定聚类中心(centroids)。如果数据分布发生显著变化,或者新数据量太大,持续使用add()方法可能会导致:
- 搜索质量下降: 聚类中心不再准确反映新的数据分布。
- 性能衰退: 倒排列表不平衡,查询效率降低。
为了避免频繁的全局重训练,我们需要一个缓存机制来处理“热数据”。
2. 解决方案:双层索引架构
双层索引策略将数据分为两部分:
- 主索引(Main Index): 存储绝大部分历史数据,采用高压缩、高效率的索引结构(如IndexIVF_PQ或IndexHNSW),定期(如每周或每天)离线重建,以保持最优性能。
- 增量索引(Delta Index): 存储最近实时写入的数据。由于数据量小,可以使用简单、易于更新的索引结构(如IndexFlatL2),或者是一个小规模、可以快速重建的IndexIVF。
用户查询时,同时在两个索引上进行搜索,然后合并结果。
3. 核心实现步骤与代码示例
我们将使用Python和faiss库来演示这个过程。假设我们的向量维度是128。
步骤 1: 环境准备
import faiss
import numpy as np
import time
D = 128 # 维度
NB = 100000 # 主索引数据量
NQ = 10 # 查询量
# 模拟生成数据
np.random.seed(1234)
xb = np.random.random((NB, D)).astype('float32')
xb[:, 0] += np.arange(NB) / 1000.
# 模拟生成实时增量数据
NI_delta = 5000
xb_delta = np.random.random((NI_delta, D)).astype('float32')
xb_delta[:, 0] += np.arange(NI_delta) / 500. + 100. # 保证与主索引数据有区分
# 模拟查询向量
xq = np.random.random((NQ, D)).astype('float32')
步骤 2: 构建主索引 (Main Index)
主索引采用高性能的IVF结构,需要先训练。
# 1. 训练主索引
N_centroids = 100
quantizer = faiss.IndexFlatL2(D)
main_index = faiss.IndexIVFFlat(quantizer, D, N_centroids, faiss.METRIC_L2)
main_index.train(xb)
# 2. 添加数据到主索引
main_index.add(xb)
print(f"主索引包含向量数: {main_index.ntotal}")
步骤 3: 构建增量索引 (Delta Index) 和实时写入
增量索引使用最简单的IndexFlatL2,因为它便于快速写入和查询,且数据量小,延迟可控。
# 构建增量索引
delta_index = faiss.IndexFlatL2(D)
# 模拟实时写入
delta_index.add(xb_delta)
print(f"增量索引包含向量数: {delta_index.ntotal}")
步骤 4: 实时查询与结果合并
查询时,我们需要分别查询两个索引,然后合并距离(D)和索引(I)。
K = 5 # 返回Top K
# 查询主索引
D_main, I_main = main_index.search(xq, K)
# 查询增量索引
D_delta, I_delta = delta_index.search(xq, K)
# 合并结果(使用numpy进行合并和排序)
D_merged = np.hstack((D_main, D_delta))
I_merged = np.hstack((I_main, I_delta))
# 按照距离D进行排序,找到全局Top K
final_D = np.zeros((NQ, K), dtype='float32')
final_I = np.zeros((NQ, K), dtype='int64')
for i in range(NQ):
# 获取当前查询点的所有结果
distances = D_merged[i, :]
indices = I_merged[i, :]
# 找到最小距离对应的排序索引
sorted_indices = np.argsort(distances)
# 取Top K
final_D[i, :] = distances[sorted_indices[:K]]
final_I[i, :] = indices[sorted_indices[:K]]
print("\n--- 最终查询结果(合并Top K)---")
print("距离 (D):\n", final_D)
步骤 5: 索引合并与持久化(增量训练的实现)
当增量索引达到一定阈值(如数据量或时间间隔)时,需要将其内容合并到主索引中。这才是真正的“增量训练”——即将增量数据添加到主索引中,并通过离线方式(如夜间批处理)定期重建主索引,以保持搜索质量。
重点: 如果主索引是IndexIVFFlat,你可以直接add()增量数据。
# 离线合并操作示例
print("\n--- 执行索引合并操作 ---")
# 1. 将增量数据加入主索引
main_index.add(xb_delta)
# 2. 清空增量索引 (等待下一批实时数据)
delta_index.reset()
print(f"合并后主索引向量数: {main_index.ntotal}")
print(f"增量索引向量数: {delta_index.ntotal}")
# 3. 持久化主索引 (例如使用faiss.write_index)
# faiss.write_index(main_index, "main_index.faiss")
通过这种双层结构,我们在白天可以保证实时写入和查询的低延迟,同时将昂贵的索引重建操作推迟到资源空闲的批处理时段,从而解决了Faiss在实时场景中增量更新的难题。
汤不热吧