refactor(milvus): 重构Milvus向量存储策略使用LangChain4j

将原有的直接Milvus客户端调用重构为使用LangChain4j的MilvusEmbeddingStore
简化了集合创建、数据存储和查询的实现逻辑
更新了相关依赖
This commit is contained in:
Yzm
2025-10-17 15:39:26 +08:00
parent c85deba6a6
commit 766f6ad266
5 changed files with 132 additions and 255 deletions

View File

@@ -80,6 +80,12 @@
<version>2.6.4</version> <version>2.6.4</version>
</dependency> </dependency>
<!-- LangChain4j Milvus Embedding Store -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-milvus</artifactId>
</dependency>
<dependency> <dependency>
<groupId>dev.langchain4j</groupId> <groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId> <artifactId>langchain4j-open-ai</artifactId>

View File

@@ -14,7 +14,7 @@ import org.ruoyi.common.core.utils.StringUtils;
* 向量库策略抽象基类 * 向量库策略抽象基类
* 提供公共的方法实现如embedding模型获取等 * 提供公共的方法实现如embedding模型获取等
* *
* @author ageer * @author Yzm
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor

View File

@@ -6,7 +6,7 @@ import org.ruoyi.service.VectorStoreService;
* 向量库策略接口 * 向量库策略接口
* 继承VectorStoreService以避免重复定义相同的方法 * 继承VectorStoreService以避免重复定义相同的方法
* *
* @author ageer * @author Yzm
*/ */
public interface VectorStoreStrategy extends VectorStoreService { public interface VectorStoreStrategy extends VectorStoreService {

View File

@@ -15,7 +15,7 @@ import java.util.Map;
* 向量库策略工厂 * 向量库策略工厂
* 根据配置动态选择向量库实现 * 根据配置动态选择向量库实现
* *
* @author ageer * @author Yzm
*/ */
@Slf4j @Slf4j
@Component @Component

View File

@@ -1,20 +1,17 @@
package org.ruoyi.service.strategy.impl; package org.ruoyi.service.strategy.impl;
import org.ruoyi.common.core.exception.ServiceException;
import dev.langchain4j.data.embedding.Embedding; import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.data.document.Metadata;
import dev.langchain4j.model.embedding.EmbeddingModel; import dev.langchain4j.model.embedding.EmbeddingModel;
import io.milvus.client.MilvusServiceClient; import dev.langchain4j.store.embedding.EmbeddingMatch;
import io.milvus.common.clientenum.ConsistencyLevelEnum; import dev.langchain4j.store.embedding.EmbeddingSearchRequest;
import io.milvus.grpc.*; import dev.langchain4j.store.embedding.EmbeddingStore;
import io.milvus.param.*; import dev.langchain4j.store.embedding.filter.Filter;
import io.milvus.param.collection.*; import dev.langchain4j.store.embedding.filter.MetadataFilterBuilder;
import io.milvus.param.dml.DeleteParam; import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
import io.milvus.param.dml.InsertParam; import io.milvus.param.IndexType;
import io.milvus.param.dml.SearchParam; import io.milvus.param.MetricType;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.param.index.DescribeIndexParam;
import io.milvus.response.DescCollResponseWrapper;
import io.milvus.response.SearchResultsWrapper;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.core.config.VectorStoreProperties; import org.ruoyi.common.core.config.VectorStoreProperties;
@@ -23,19 +20,19 @@ import org.ruoyi.domain.bo.StoreEmbeddingBo;
import org.ruoyi.service.strategy.AbstractVectorStoreStrategy; import org.ruoyi.service.strategy.AbstractVectorStoreStrategy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
/** /**
* Milvus向量库策略实现 * Milvus向量库策略实现
* *
* @author ageer * @author Yzm
*/ */
@Slf4j @Slf4j
@Component @Component
public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy { public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
private MilvusServiceClient milvusClient;
public MilvusVectorStoreStrategy(VectorStoreProperties vectorStoreProperties) { public MilvusVectorStoreStrategy(VectorStoreProperties vectorStoreProperties) {
super(vectorStoreProperties); super(vectorStoreProperties);
} }
@@ -49,290 +46,164 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
public void createSchema(String vectorModelName, String kid) { public void createSchema(String vectorModelName, String kid) {
String url = vectorStoreProperties.getMilvus().getUrl(); String url = vectorStoreProperties.getMilvus().getUrl();
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
// 使用 LangChain4j 的 MilvusEmbeddingStore 来确保集合存在(按需创建)
// 创建Milvus客户端连接 MilvusEmbeddingStore store = MilvusEmbeddingStore.builder()
ConnectParam connectParam = ConnectParam.newBuilder() .uri(url)
.withUri(url) .collectionName(collectionName)
.dimension(2048)
.indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(true)
.idFieldName("id")
.textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build(); .build();
milvusClient = new MilvusServiceClient(connectParam); log.info("Milvus集合初始化完成: {}", collectionName);
// 检查集合是否存在
HasCollectionParam hasCollectionParam = HasCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build();
R<Boolean> hasCollectionResponse = milvusClient.hasCollection(hasCollectionParam);
if (hasCollectionResponse.getStatus() != R.Status.Success.getCode()) {
log.error("检查集合是否存在失败: {}", hasCollectionResponse.getMessage());
return;
}
if (!hasCollectionResponse.getData()) {
// 创建字段
List<FieldType> fields = new ArrayList<>();
// ID字段 (主键)
fields.add(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(true)
.build());
// 文本字段
fields.add(FieldType.newBuilder()
.withName("text")
.withDataType(DataType.VarChar)
.withMaxLength(65535)
.build());
// fid字段
fields.add(FieldType.newBuilder()
.withName("fid")
.withDataType(DataType.VarChar)
.withMaxLength(255)
.build());
// kid字段
fields.add(FieldType.newBuilder()
.withName("kid")
.withDataType(DataType.VarChar)
.withMaxLength(255)
.build());
// docId字段
fields.add(FieldType.newBuilder()
.withName("docId")
.withDataType(DataType.VarChar)
.withMaxLength(255)
.build());
// 向量字段
fields.add(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(2048) // 根据实际embedding维度调整
.build());
// 创建集合
CreateCollectionParam createCollectionParam = CreateCollectionParam.newBuilder()
.withCollectionName(collectionName)
.withDescription("Knowledge base collection for " + kid)
.withShardsNum(2)
.withFieldTypes(fields)
.build();
R<RpcStatus> createCollectionResponse = milvusClient.createCollection(createCollectionParam);
if (createCollectionResponse.getStatus() != R.Status.Success.getCode()) {
log.error("创建集合失败: {}", createCollectionResponse.getMessage());
return;
}
// 创建索引
CreateIndexParam createIndexParam = CreateIndexParam.newBuilder()
.withCollectionName(collectionName)
.withFieldName("vector")
.withIndexType(IndexType.IVF_FLAT)
.withMetricType(MetricType.L2)
.withExtraParam("{\"nlist\":1024}")
.build();
R<RpcStatus> createIndexResponse = milvusClient.createIndex(createIndexParam);
if (createIndexResponse.getStatus() != R.Status.Success.getCode()) {
log.error("创建索引失败: {}", createIndexResponse.getMessage());
} else {
log.info("Milvus集合和索引创建成功: {}", collectionName);
}
} else {
log.info("Milvus集合已存在: {}", collectionName);
}
} }
@Override @Override
public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) { public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) {
createSchema(storeEmbeddingBo.getVectorModelName(), storeEmbeddingBo.getKid()); createSchema(storeEmbeddingBo.getVectorModelName(), storeEmbeddingBo.getKid());
EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(), EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(),
storeEmbeddingBo.getApiKey(), storeEmbeddingBo.getBaseUrl()); storeEmbeddingBo.getApiKey(), storeEmbeddingBo.getBaseUrl());
List<String> chunkList = storeEmbeddingBo.getChunkList(); List<String> chunkList = storeEmbeddingBo.getChunkList();
List<String> fidList = storeEmbeddingBo.getFids(); List<String> fidList = storeEmbeddingBo.getFids();
String kid = storeEmbeddingBo.getKid(); String kid = storeEmbeddingBo.getKid();
String docId = storeEmbeddingBo.getDocId(); String docId = storeEmbeddingBo.getDocId();
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
log.info("Milvus向量存储条数记录: " + chunkList.size()); log.info("Milvus向量存储条数记录: {}", chunkList.size());
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// 准备批量插入数据 EmbeddingStore<TextSegment> embeddingStore = MilvusEmbeddingStore.builder()
List<InsertParam.Field> fields = new ArrayList<>(); .uri(vectorStoreProperties.getMilvus().getUrl())
List<String> textList = new ArrayList<>(); .collectionName(collectionName)
List<String> fidListData = new ArrayList<>(); .dimension(2048)
List<String> kidList = new ArrayList<>(); .indexType(IndexType.IVF_FLAT)
List<String> docIdList = new ArrayList<>(); .metricType(MetricType.L2)
List<List<Float>> vectorList = new ArrayList<>(); .autoFlushOnInsert(false)
.idFieldName("id")
for (int i = 0; i < chunkList.size(); i++) { .textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build();
IntStream.range(0, chunkList.size()).forEach(i -> {
String text = chunkList.get(i); String text = chunkList.get(i);
String fid = fidList.get(i); String fid = fidList.get(i);
Embedding embedding = embeddingModel.embed(text).content(); Embedding embedding = embeddingModel.embed(text).content();
Metadata metadata = new Metadata()
textList.add(text); .put("fid", fid)
fidListData.add(fid); .put("kid", kid)
kidList.add(kid); .put("docId", docId);
docIdList.add(docId); TextSegment segment = TextSegment.from(text, metadata);
embeddingStore.add(embedding, segment);
List<Float> vector = new ArrayList<>(); });
for (float f : embedding.vector()) {
vector.add(f);
}
vectorList.add(vector);
}
// 构建字段数据
fields.add(new InsertParam.Field("text", textList));
fields.add(new InsertParam.Field("fid", fidListData));
fields.add(new InsertParam.Field("kid", kidList));
fields.add(new InsertParam.Field("docId", docIdList));
fields.add(new InsertParam.Field("vector", vectorList));
// 执行插入
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(collectionName)
.withFields(fields)
.build();
R<MutationResult> insertResponse = milvusClient.insert(insertParam);
if (insertResponse.getStatus() != R.Status.Success.getCode()) {
log.error("Milvus向量存储失败: {}", insertResponse.getMessage());
throw new ServiceException("Milvus向量存储失败");
} else {
log.info("Milvus向量存储成功插入条数: {}", insertResponse.getData().getInsertCnt());
}
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
log.info("Milvus向量存储完成消耗时间" + (endTime - startTime) / 1000 + ""); log.info("Milvus向量存储完成消耗时间{}秒", (endTime - startTime) / 1000);
} }
@Override @Override
public List<String> getQueryVector(QueryVectorBo queryVectorBo) { public List<String> getQueryVector(QueryVectorBo queryVectorBo) {
createSchema(queryVectorBo.getVectorModelName(), queryVectorBo.getKid()); createSchema(queryVectorBo.getVectorModelName(), queryVectorBo.getKid());
EmbeddingModel embeddingModel = getEmbeddingModel(queryVectorBo.getEmbeddingModelName(), EmbeddingModel embeddingModel = getEmbeddingModel(queryVectorBo.getEmbeddingModelName(),
queryVectorBo.getApiKey(), queryVectorBo.getBaseUrl()); queryVectorBo.getApiKey(), queryVectorBo.getBaseUrl());
Embedding queryEmbedding = embeddingModel.embed(queryVectorBo.getQuery()).content(); Embedding queryEmbedding = embeddingModel.embed(queryVectorBo.getQuery()).content();
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + queryVectorBo.getKid(); String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + queryVectorBo.getKid();
EmbeddingStore<TextSegment> embeddingStore = MilvusEmbeddingStore.builder()
.uri(vectorStoreProperties.getMilvus().getUrl())
.collectionName(collectionName)
.dimension(2048)
.indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(true)
.idFieldName("id")
.textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build();
List<String> resultList = new ArrayList<>(); List<String> resultList = new ArrayList<>();
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
// 加载集合到内存 .queryEmbedding(queryEmbedding)
LoadCollectionParam loadCollectionParam = LoadCollectionParam.newBuilder() .maxResults(queryVectorBo.getMaxResults())
.withCollectionName(collectionName)
.build(); .build();
milvusClient.loadCollection(loadCollectionParam); List<EmbeddingMatch<TextSegment>> matches = embeddingStore.search(request).matches();
for (EmbeddingMatch<TextSegment> match : matches) {
// 准备查询向量 TextSegment segment = match.embedded();
List<List<Float>> searchVectors = new ArrayList<>(); if (segment != null) {
List<Float> queryVector = new ArrayList<>(); resultList.add(segment.text());
for (float f : queryEmbedding.vector()) {
queryVector.add(f);
}
searchVectors.add(queryVector);
// 构建搜索参数
SearchParam searchParam = SearchParam.newBuilder()
.withCollectionName(collectionName)
// 匹配方法
.withMetricType(MetricType.L2)
.withOutFields(Arrays.asList("text", "fid", "kid", "docId"))
.withTopK(queryVectorBo.getMaxResults())
.withVectors(searchVectors)
.withVectorFieldName("vector")
.withParams("{\"nprobe\":10}")
.build();
R<SearchResults> searchResponse = milvusClient.search(searchParam);
if (searchResponse.getStatus() != R.Status.Success.getCode()) {
log.error("Milvus查询失败: {}", searchResponse.getMessage());
return resultList;
}
SearchResultsWrapper wrapper = new SearchResultsWrapper(searchResponse.getData().getResults());
// 遍历搜索结果
for (int i = 0; i < wrapper.getIDScore(0).size(); i++) {
SearchResultsWrapper.IDScore idScore = wrapper.getIDScore(0).get(i);
// 获取text字段数据
List<?> textFieldData = wrapper.getFieldData("text", 0);
if (textFieldData != null && i < textFieldData.size()) {
Object textObj = textFieldData.get(i);
if (textObj != null) {
resultList.add(textObj.toString());
log.debug("找到相似文本ID: {}, 距离: {}, 内容: {}",
idScore.getLongID(), idScore.getScore(), textObj.toString());
}
} }
} }
return resultList; return resultList;
} }
@Override @Override
@SneakyThrows @SneakyThrows
public void removeById(String id, String modelName) { public void removeById(String id, String modelName) {
String url = vectorStoreProperties.getMilvus().getUrl();
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + id; String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + id;
MilvusEmbeddingStore store = MilvusEmbeddingStore.builder()
// 删除整个集合 .uri(url)
DropCollectionParam dropCollectionParam = DropCollectionParam.newBuilder() .collectionName(collectionName)
.withCollectionName(collectionName) .dimension(2048)
.indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(true)
.idFieldName("id")
.textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build(); .build();
// 修正MilvusEmbeddingStore 的 dropCollection 需要传入集合名
R<RpcStatus> dropResponse = milvusClient.dropCollection(dropCollectionParam); store.dropCollection(collectionName);
if (dropResponse.getStatus() != R.Status.Success.getCode()) { log.info("Milvus集合删除成功: {}", collectionName);
log.error("Milvus集合删除失败: {}", dropResponse.getMessage());
throw new ServiceException("Milvus集合删除失败");
} else {
log.info("Milvus集合删除成功: {}", collectionName);
}
} }
@Override @Override
public void removeByDocId(String docId, String kid) { public void removeByDocId(String docId, String kid) {
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
EmbeddingStore<TextSegment> embeddingStore = MilvusEmbeddingStore.builder()
String expr = "docId == \"" + docId + "\""; .uri(vectorStoreProperties.getMilvus().getUrl())
DeleteParam deleteParam = DeleteParam.newBuilder() .collectionName(collectionName)
.withCollectionName(collectionName) .dimension(2048)
.withExpr(expr) .indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(false)
.idFieldName("id")
.textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build(); .build();
Filter filter = MetadataFilterBuilder.metadataKey("docId").isEqualTo(docId);
R<MutationResult> deleteResponse = milvusClient.delete(deleteParam); embeddingStore.removeAll(filter);
if (deleteResponse.getStatus() != R.Status.Success.getCode()) { log.info("Milvus成功删除 docId={} 的所有向量数据", docId);
log.error("Milvus删除失败: {}", deleteResponse.getMessage());
throw new ServiceException("Milvus删除失败");
} else {
log.info("Milvus成功删除 docId={} 的所有向量数据,删除条数: {}", docId, deleteResponse.getData().getDeleteCnt());
}
} }
@Override @Override
public void removeByFid(String fid, String kid) { public void removeByFid(String fid, String kid) {
String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid;
EmbeddingStore<TextSegment> embeddingStore = MilvusEmbeddingStore.builder()
String expr = "fid == \"" + fid + "\""; .uri(vectorStoreProperties.getMilvus().getUrl())
DeleteParam deleteParam = DeleteParam.newBuilder() .collectionName(collectionName)
.withCollectionName(collectionName) .dimension(2048)
.withExpr(expr) .indexType(IndexType.IVF_FLAT)
.metricType(MetricType.L2)
.autoFlushOnInsert(false)
.idFieldName("id")
.textFieldName("text")
.metadataFieldName("metadata")
.vectorFieldName("vector")
.build(); .build();
Filter filter = MetadataFilterBuilder.metadataKey("fid").isEqualTo(fid);
R<MutationResult> deleteResponse = milvusClient.delete(deleteParam); embeddingStore.removeAll(filter);
if (deleteResponse.getStatus() != R.Status.Success.getCode()) { log.info("Milvus成功删除 fid={} 的所有向量数据", fid);
log.error("Milvus删除失败: {}", deleteResponse.getMessage());
throw new ServiceException("Milvus删除失败");
} else {
log.info("Milvus成功删除 fid={} 的所有向量数据,删除条数: {}", fid, deleteResponse.getData().getDeleteCnt());
}
} }
} }