新增知识库分支工作流节点

This commit is contained in:
stageluo
2025-11-25 09:26:39 +08:00
parent 9df246321e
commit c22c5eac7f
3 changed files with 423 additions and 15 deletions

View File

@@ -31,9 +31,6 @@ import java.util.stream.IntStream;
@Component
public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
private final Integer DIMENSION = 2048;
public MilvusVectorStoreStrategy(VectorStoreProperties vectorStoreProperties, EmbeddingModelFactory embeddingModelFactory) {
super(vectorStoreProperties, embeddingModelFactory);
}
@@ -41,13 +38,16 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
// 缓存不同集合与 autoFlush 配置的 Milvus 连接
private final Map<String, EmbeddingStore<TextSegment>> storeCache = new ConcurrentHashMap<>();
private EmbeddingStore<TextSegment> getMilvusStore(String collectionName, boolean autoFlushOnInsert) {
String key = collectionName + "|" + autoFlushOnInsert;
/**
* 获取 Milvus Store支持动态维度
*/
private EmbeddingStore<TextSegment> getMilvusStore(String collectionName, int dimension, boolean autoFlushOnInsert) {
String key = collectionName + "|" + dimension + "|" + autoFlushOnInsert;
return storeCache.computeIfAbsent(key, k ->
MilvusEmbeddingStore.builder()
.uri(vectorStoreProperties.getMilvus().getUrl())
.collectionName(collectionName)
.dimension(DIMENSION)
.dimension(dimension)
.indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(autoFlushOnInsert)
@@ -58,18 +58,37 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
.build()
);
}
/**
* 获取 embedding 模型的实际维度
*/
private int getModelDimension(String modelName) {
try {
EmbeddingModel model = getEmbeddingModel(modelName, null);
// 使用一个测试文本获取向量维度
Embedding testEmbedding = model.embed("test").content();
int dimension = testEmbedding.dimension();
log.info("Detected embedding model dimension: {} for model: {}", dimension, modelName);
return dimension;
} catch (Exception e) {
log.warn("Failed to detect model dimension for: {}, using default 1024", modelName, e);
return 1024; // 默认使用 1024 (bge-m3 的维度)
}
}
@Override
public void createSchema(String kid, String modelName) {
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
int dimension = getModelDimension(modelName);
// 使用缓存获取连接以确保只初始化一次
EmbeddingStore<TextSegment> store = getMilvusStore(collectionName, true);
log.info("Milvus集合初始化完成: {}", collectionName);
EmbeddingStore<TextSegment> store = getMilvusStore(collectionName, dimension, true);
log.info("Milvus集合初始化完成: {}, dimension: {}", collectionName, dimension);
}
@Override
public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) {
EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(), DIMENSION);
int dimension = getModelDimension(storeEmbeddingBo.getEmbeddingModelName());
EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(), dimension);
List<String> chunkList = storeEmbeddingBo.getChunkList();
List<String> fidList = storeEmbeddingBo.getFids();
@@ -81,7 +100,7 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
long startTime = System.currentTimeMillis();
// 复用连接,写入场景使用 autoFlush=false 以提升批量插入性能
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, false);
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, dimension, false);
IntStream.range(0, chunkList.size()).forEach(i -> {
String text = chunkList.get(i);
@@ -101,13 +120,14 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
@Override
public List<String> getQueryVector(QueryVectorBo queryVectorBo) {
EmbeddingModel embeddingModel = getEmbeddingModel(queryVectorBo.getEmbeddingModelName(), DIMENSION);
int dimension = getModelDimension(queryVectorBo.getEmbeddingModelName());
EmbeddingModel embeddingModel = getEmbeddingModel(queryVectorBo.getEmbeddingModelName(), dimension);
Embedding queryEmbedding = embeddingModel.embed(queryVectorBo.getQuery()).content();
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + queryVectorBo.getKid();
// 查询复用连接autoFlush 对查询无影响,此处保持 true
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, true);
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, dimension, true);
List<String> resultList = new ArrayList<>();
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
@@ -128,14 +148,16 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
@SneakyThrows
public void removeById(String id, String modelName) {
// 注意:此处原逻辑使用 collectionname + id保持现状
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(vectorStoreProperties.getMilvus().getCollectionname() + id, false);
int dimension = getModelDimension(modelName);
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(vectorStoreProperties.getMilvus().getCollectionname() + id, dimension, false);
embeddingStore.remove(id);
}
@Override
public void removeByDocId(String docId, String kid) {
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, false);
// 使用默认维度,因为删除操作不需要精确的维度信息
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, 1024, false);
Filter filter = MetadataFilterBuilder.metadataKey("docId").isEqualTo(docId);
embeddingStore.removeAll(filter);
log.info("Milvus成功删除 docId={} 的所有向量数据", docId);
@@ -144,7 +166,8 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
@Override
public void removeByFid(String fid, String kid) {
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, false);
// 使用默认维度,因为删除操作不需要精确的维度信息
EmbeddingStore<TextSegment> embeddingStore = getMilvusStore(collectionName, 1024, false);
Filter filter = MetadataFilterBuilder.metadataKey("fid").isEqualTo(fid);
embeddingStore.removeAll(filter);
log.info("Milvus成功删除 fid={} 的所有向量数据", fid);