From 766f6ad266a17f0a7435cc30b50fa2c9e658e75c Mon Sep 17 00:00:00 2001 From: Yzm Date: Fri, 17 Oct 2025 15:39:26 +0800 Subject: [PATCH] =?UTF-8?q?refactor(milvus):=20=E9=87=8D=E6=9E=84Milvus?= =?UTF-8?q?=E5=90=91=E9=87=8F=E5=AD=98=E5=82=A8=E7=AD=96=E7=95=A5=E4=BD=BF?= =?UTF-8?q?=E7=94=A8LangChain4j?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将原有的直接Milvus客户端调用重构为使用LangChain4j的MilvusEmbeddingStore 简化了集合创建、数据存储和查询的实现逻辑 更新了相关依赖 --- ruoyi-modules-api/ruoyi-knowledge-api/pom.xml | 6 + .../strategy/AbstractVectorStoreStrategy.java | 2 +- .../service/strategy/VectorStoreStrategy.java | 2 +- .../strategy/VectorStoreStrategyFactory.java | 2 +- .../impl/MilvusVectorStoreStrategy.java | 375 ++++++------------ 5 files changed, 132 insertions(+), 255 deletions(-) diff --git a/ruoyi-modules-api/ruoyi-knowledge-api/pom.xml b/ruoyi-modules-api/ruoyi-knowledge-api/pom.xml index 90933a7d..11360516 100644 --- a/ruoyi-modules-api/ruoyi-knowledge-api/pom.xml +++ b/ruoyi-modules-api/ruoyi-knowledge-api/pom.xml @@ -80,6 +80,12 @@ 2.6.4 + + + dev.langchain4j + langchain4j-milvus + + dev.langchain4j langchain4j-open-ai diff --git a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/AbstractVectorStoreStrategy.java b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/AbstractVectorStoreStrategy.java index 4f7616c0..07f0866f 100644 --- a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/AbstractVectorStoreStrategy.java +++ b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/AbstractVectorStoreStrategy.java @@ -14,7 +14,7 @@ import org.ruoyi.common.core.utils.StringUtils; * 向量库策略抽象基类 * 提供公共的方法实现,如embedding模型获取等 * - * @author ageer + * @author Yzm */ @Slf4j @RequiredArgsConstructor diff --git a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategy.java b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategy.java index bd93e6fa..b0c965d6 100644 --- a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategy.java +++ b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategy.java @@ -6,7 +6,7 @@ import org.ruoyi.service.VectorStoreService; * 向量库策略接口 * 继承VectorStoreService以避免重复定义相同的方法 * - * @author ageer + * @author Yzm */ public interface VectorStoreStrategy extends VectorStoreService { diff --git a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategyFactory.java b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategyFactory.java index fbd5b27b..579b989f 100644 --- a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategyFactory.java +++ b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/VectorStoreStrategyFactory.java @@ -15,7 +15,7 @@ import java.util.Map; * 向量库策略工厂 * 根据配置动态选择向量库实现 * - * @author ageer + * @author Yzm */ @Slf4j @Component diff --git a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/impl/MilvusVectorStoreStrategy.java b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/impl/MilvusVectorStoreStrategy.java index f8e58733..07ee2ba6 100644 --- a/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/impl/MilvusVectorStoreStrategy.java +++ b/ruoyi-modules-api/ruoyi-knowledge-api/src/main/java/org/ruoyi/service/strategy/impl/MilvusVectorStoreStrategy.java @@ -1,20 +1,17 @@ package org.ruoyi.service.strategy.impl; -import org.ruoyi.common.core.exception.ServiceException; import dev.langchain4j.data.embedding.Embedding; +import dev.langchain4j.data.segment.TextSegment; +import dev.langchain4j.data.document.Metadata; import dev.langchain4j.model.embedding.EmbeddingModel; -import io.milvus.client.MilvusServiceClient; -import io.milvus.common.clientenum.ConsistencyLevelEnum; -import io.milvus.grpc.*; -import io.milvus.param.*; -import io.milvus.param.collection.*; -import io.milvus.param.dml.DeleteParam; -import io.milvus.param.dml.InsertParam; -import io.milvus.param.dml.SearchParam; -import io.milvus.param.index.CreateIndexParam; -import io.milvus.param.index.DescribeIndexParam; -import io.milvus.response.DescCollResponseWrapper; -import io.milvus.response.SearchResultsWrapper; +import dev.langchain4j.store.embedding.EmbeddingMatch; +import dev.langchain4j.store.embedding.EmbeddingSearchRequest; +import dev.langchain4j.store.embedding.EmbeddingStore; +import dev.langchain4j.store.embedding.filter.Filter; +import dev.langchain4j.store.embedding.filter.MetadataFilterBuilder; +import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore; +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.ruoyi.common.core.config.VectorStoreProperties; @@ -23,19 +20,19 @@ import org.ruoyi.domain.bo.StoreEmbeddingBo; import org.ruoyi.service.strategy.AbstractVectorStoreStrategy; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; /** * Milvus向量库策略实现 * - * @author ageer + * @author Yzm */ @Slf4j @Component public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy { - private MilvusServiceClient milvusClient; - public MilvusVectorStoreStrategy(VectorStoreProperties vectorStoreProperties) { super(vectorStoreProperties); } @@ -49,290 +46,164 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy { public void createSchema(String vectorModelName, String kid) { String url = vectorStoreProperties.getMilvus().getUrl(); String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; - - // 创建Milvus客户端连接 - ConnectParam connectParam = ConnectParam.newBuilder() - .withUri(url) + // 使用 LangChain4j 的 MilvusEmbeddingStore 来确保集合存在(按需创建) + MilvusEmbeddingStore store = MilvusEmbeddingStore.builder() + .uri(url) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(true) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") .build(); - milvusClient = new MilvusServiceClient(connectParam); - - // 检查集合是否存在 - HasCollectionParam hasCollectionParam = HasCollectionParam.newBuilder() - .withCollectionName(collectionName) - .build(); - - R hasCollectionResponse = milvusClient.hasCollection(hasCollectionParam); - if (hasCollectionResponse.getStatus() != R.Status.Success.getCode()) { - log.error("检查集合是否存在失败: {}", hasCollectionResponse.getMessage()); - return; - } - - if (!hasCollectionResponse.getData()) { - // 创建字段 - List fields = new ArrayList<>(); - - // ID字段 (主键) - fields.add(FieldType.newBuilder() - .withName("id") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(true) - .build()); - - // 文本字段 - fields.add(FieldType.newBuilder() - .withName("text") - .withDataType(DataType.VarChar) - .withMaxLength(65535) - .build()); - - // fid字段 - fields.add(FieldType.newBuilder() - .withName("fid") - .withDataType(DataType.VarChar) - .withMaxLength(255) - .build()); - - // kid字段 - fields.add(FieldType.newBuilder() - .withName("kid") - .withDataType(DataType.VarChar) - .withMaxLength(255) - .build()); - - // docId字段 - fields.add(FieldType.newBuilder() - .withName("docId") - .withDataType(DataType.VarChar) - .withMaxLength(255) - .build()); - - // 向量字段 - fields.add(FieldType.newBuilder() - .withName("vector") - .withDataType(DataType.FloatVector) - .withDimension(2048) // 根据实际embedding维度调整 - .build()); - - // 创建集合 - CreateCollectionParam createCollectionParam = CreateCollectionParam.newBuilder() - .withCollectionName(collectionName) - .withDescription("Knowledge base collection for " + kid) - .withShardsNum(2) - .withFieldTypes(fields) - .build(); - - R createCollectionResponse = milvusClient.createCollection(createCollectionParam); - if (createCollectionResponse.getStatus() != R.Status.Success.getCode()) { - log.error("创建集合失败: {}", createCollectionResponse.getMessage()); - return; - } - - // 创建索引 - CreateIndexParam createIndexParam = CreateIndexParam.newBuilder() - .withCollectionName(collectionName) - .withFieldName("vector") - .withIndexType(IndexType.IVF_FLAT) - .withMetricType(MetricType.L2) - .withExtraParam("{\"nlist\":1024}") - .build(); - - R createIndexResponse = milvusClient.createIndex(createIndexParam); - if (createIndexResponse.getStatus() != R.Status.Success.getCode()) { - log.error("创建索引失败: {}", createIndexResponse.getMessage()); - } else { - log.info("Milvus集合和索引创建成功: {}", collectionName); - } - } else { - log.info("Milvus集合已存在: {}", collectionName); - } + log.info("Milvus集合初始化完成: {}", collectionName); } @Override public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) { createSchema(storeEmbeddingBo.getVectorModelName(), storeEmbeddingBo.getKid()); - + EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(), storeEmbeddingBo.getApiKey(), storeEmbeddingBo.getBaseUrl()); - + List chunkList = storeEmbeddingBo.getChunkList(); List fidList = storeEmbeddingBo.getFids(); String kid = storeEmbeddingBo.getKid(); String docId = storeEmbeddingBo.getDocId(); String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; - - log.info("Milvus向量存储条数记录: " + chunkList.size()); + + log.info("Milvus向量存储条数记录: {}", chunkList.size()); long startTime = System.currentTimeMillis(); - - // 准备批量插入数据 - List fields = new ArrayList<>(); - List textList = new ArrayList<>(); - List fidListData = new ArrayList<>(); - List kidList = new ArrayList<>(); - List docIdList = new ArrayList<>(); - List> vectorList = new ArrayList<>(); - - for (int i = 0; i < chunkList.size(); i++) { + + EmbeddingStore embeddingStore = MilvusEmbeddingStore.builder() + .uri(vectorStoreProperties.getMilvus().getUrl()) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(false) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") + .build(); + + IntStream.range(0, chunkList.size()).forEach(i -> { String text = chunkList.get(i); String fid = fidList.get(i); Embedding embedding = embeddingModel.embed(text).content(); - - textList.add(text); - fidListData.add(fid); - kidList.add(kid); - docIdList.add(docId); - - List vector = new ArrayList<>(); - for (float f : embedding.vector()) { - vector.add(f); - } - vectorList.add(vector); - } - - // 构建字段数据 - fields.add(new InsertParam.Field("text", textList)); - fields.add(new InsertParam.Field("fid", fidListData)); - fields.add(new InsertParam.Field("kid", kidList)); - fields.add(new InsertParam.Field("docId", docIdList)); - fields.add(new InsertParam.Field("vector", vectorList)); - - // 执行插入 - InsertParam insertParam = InsertParam.newBuilder() - .withCollectionName(collectionName) - .withFields(fields) - .build(); - - R insertResponse = milvusClient.insert(insertParam); - if (insertResponse.getStatus() != R.Status.Success.getCode()) { - log.error("Milvus向量存储失败: {}", insertResponse.getMessage()); - throw new ServiceException("Milvus向量存储失败"); - } else { - log.info("Milvus向量存储成功,插入条数: {}", insertResponse.getData().getInsertCnt()); - } - + Metadata metadata = new Metadata() + .put("fid", fid) + .put("kid", kid) + .put("docId", docId); + TextSegment segment = TextSegment.from(text, metadata); + embeddingStore.add(embedding, segment); + }); + long endTime = System.currentTimeMillis(); - log.info("Milvus向量存储完成消耗时间:" + (endTime - startTime) / 1000 + "秒"); + log.info("Milvus向量存储完成消耗时间:{}秒", (endTime - startTime) / 1000); } @Override public List getQueryVector(QueryVectorBo queryVectorBo) { createSchema(queryVectorBo.getVectorModelName(), queryVectorBo.getKid()); - + EmbeddingModel embeddingModel = getEmbeddingModel(queryVectorBo.getEmbeddingModelName(), queryVectorBo.getApiKey(), queryVectorBo.getBaseUrl()); - + Embedding queryEmbedding = embeddingModel.embed(queryVectorBo.getQuery()).content(); String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + queryVectorBo.getKid(); - + + EmbeddingStore embeddingStore = MilvusEmbeddingStore.builder() + .uri(vectorStoreProperties.getMilvus().getUrl()) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(true) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") + .build(); + List resultList = new ArrayList<>(); - - // 加载集合到内存 - LoadCollectionParam loadCollectionParam = LoadCollectionParam.newBuilder() - .withCollectionName(collectionName) + EmbeddingSearchRequest request = EmbeddingSearchRequest.builder() + .queryEmbedding(queryEmbedding) + .maxResults(queryVectorBo.getMaxResults()) .build(); - milvusClient.loadCollection(loadCollectionParam); - - // 准备查询向量 - List> searchVectors = new ArrayList<>(); - List queryVector = new ArrayList<>(); - for (float f : queryEmbedding.vector()) { - queryVector.add(f); - } - searchVectors.add(queryVector); - - // 构建搜索参数 - SearchParam searchParam = SearchParam.newBuilder() - .withCollectionName(collectionName) - // 匹配方法 - .withMetricType(MetricType.L2) - .withOutFields(Arrays.asList("text", "fid", "kid", "docId")) - .withTopK(queryVectorBo.getMaxResults()) - .withVectors(searchVectors) - .withVectorFieldName("vector") - .withParams("{\"nprobe\":10}") - .build(); - - R searchResponse = milvusClient.search(searchParam); - if (searchResponse.getStatus() != R.Status.Success.getCode()) { - log.error("Milvus查询失败: {}", searchResponse.getMessage()); - return resultList; - } - - SearchResultsWrapper wrapper = new SearchResultsWrapper(searchResponse.getData().getResults()); - - // 遍历搜索结果 - for (int i = 0; i < wrapper.getIDScore(0).size(); i++) { - SearchResultsWrapper.IDScore idScore = wrapper.getIDScore(0).get(i); - - // 获取text字段数据 - List textFieldData = wrapper.getFieldData("text", 0); - if (textFieldData != null && i < textFieldData.size()) { - Object textObj = textFieldData.get(i); - if (textObj != null) { - resultList.add(textObj.toString()); - log.debug("找到相似文本,ID: {}, 距离: {}, 内容: {}", - idScore.getLongID(), idScore.getScore(), textObj.toString()); - } + List> matches = embeddingStore.search(request).matches(); + for (EmbeddingMatch match : matches) { + TextSegment segment = match.embedded(); + if (segment != null) { + resultList.add(segment.text()); } } - return resultList; } @Override @SneakyThrows public void removeById(String id, String modelName) { + String url = vectorStoreProperties.getMilvus().getUrl(); String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + id; - - // 删除整个集合 - DropCollectionParam dropCollectionParam = DropCollectionParam.newBuilder() - .withCollectionName(collectionName) + MilvusEmbeddingStore store = MilvusEmbeddingStore.builder() + .uri(url) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(true) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") .build(); - - R dropResponse = milvusClient.dropCollection(dropCollectionParam); - if (dropResponse.getStatus() != R.Status.Success.getCode()) { - log.error("Milvus集合删除失败: {}", dropResponse.getMessage()); - throw new ServiceException("Milvus集合删除失败"); - } else { - log.info("Milvus集合删除成功: {}", collectionName); - } + // 修正:MilvusEmbeddingStore 的 dropCollection 需要传入集合名 + store.dropCollection(collectionName); + log.info("Milvus集合删除成功: {}", collectionName); } @Override public void removeByDocId(String docId, String kid) { String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; - - String expr = "docId == \"" + docId + "\""; - DeleteParam deleteParam = DeleteParam.newBuilder() - .withCollectionName(collectionName) - .withExpr(expr) + EmbeddingStore embeddingStore = MilvusEmbeddingStore.builder() + .uri(vectorStoreProperties.getMilvus().getUrl()) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(false) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") .build(); - - R deleteResponse = milvusClient.delete(deleteParam); - if (deleteResponse.getStatus() != R.Status.Success.getCode()) { - log.error("Milvus删除失败: {}", deleteResponse.getMessage()); - throw new ServiceException("Milvus删除失败"); - } else { - log.info("Milvus成功删除 docId={} 的所有向量数据,删除条数: {}", docId, deleteResponse.getData().getDeleteCnt()); - } + Filter filter = MetadataFilterBuilder.metadataKey("docId").isEqualTo(docId); + embeddingStore.removeAll(filter); + log.info("Milvus成功删除 docId={} 的所有向量数据", docId); } @Override public void removeByFid(String fid, String kid) { String collectionName = vectorStoreProperties.getMilvus().getCollectionname() + kid; - - String expr = "fid == \"" + fid + "\""; - DeleteParam deleteParam = DeleteParam.newBuilder() - .withCollectionName(collectionName) - .withExpr(expr) + EmbeddingStore embeddingStore = MilvusEmbeddingStore.builder() + .uri(vectorStoreProperties.getMilvus().getUrl()) + .collectionName(collectionName) + .dimension(2048) + .indexType(IndexType.IVF_FLAT) + .metricType(MetricType.L2) + .autoFlushOnInsert(false) + .idFieldName("id") + .textFieldName("text") + .metadataFieldName("metadata") + .vectorFieldName("vector") .build(); - - R deleteResponse = milvusClient.delete(deleteParam); - if (deleteResponse.getStatus() != R.Status.Success.getCode()) { - log.error("Milvus删除失败: {}", deleteResponse.getMessage()); - throw new ServiceException("Milvus删除失败"); - } else { - log.info("Milvus成功删除 fid={} 的所有向量数据,删除条数: {}", fid, deleteResponse.getData().getDeleteCnt()); - } + Filter filter = MetadataFilterBuilder.metadataKey("fid").isEqualTo(fid); + embeddingStore.removeAll(filter); + log.info("Milvus成功删除 fid={} 的所有向量数据", fid); } -} \ No newline at end of file +}