feat(milvus): 实现Milvus向量数据库集成

- 添加Milvus Java SDK依赖
- 实现MilvusVectorStoreStrategy核心功能
- 支持集合管理、数据存储、向量搜索和数据删除
- 添加Milvus实现指南文档
- 更新数据库连接配置
- 修改VectorStoreService接口添加异常声明
This commit is contained in:
Yzm
2025-09-29 18:36:48 +08:00
parent 39fe2cc48f
commit ef49429543
5 changed files with 460 additions and 213 deletions

View File

@@ -1,5 +1,6 @@
package org.ruoyi.service;
import com.google.protobuf.ServiceException;
import org.ruoyi.domain.bo.QueryVectorBo;
import org.ruoyi.domain.bo.StoreEmbeddingBo;
@@ -11,15 +12,15 @@ import java.util.List;
*/
public interface VectorStoreService {
void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo);
void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) throws ServiceException;
List<String> getQueryVector(QueryVectorBo queryVectorBo);
void createSchema(String vectorModelName, String kid,String modelName);
void removeById(String id,String modelName);
void removeById(String id,String modelName) throws ServiceException;
void removeByDocId(String docId, String kid);
void removeByDocId(String docId, String kid) throws ServiceException;
void removeByFid(String fid, String kid);
void removeByFid(String fid, String kid) throws ServiceException;
}

View File

@@ -1,8 +1,28 @@
package org.ruoyi.service.strategy.impl;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.ServiceException;
import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.model.embedding.EmbeddingModel;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.DataType;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.request.DropCollectionReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.vector.request.DeleteReq;
import io.milvus.v2.service.vector.request.InsertReq;
import io.milvus.v2.service.vector.request.SearchReq;
import io.milvus.v2.service.vector.request.data.BaseVector;
import io.milvus.v2.service.vector.request.data.FloatVec;
import io.milvus.v2.service.vector.response.DeleteResp;
import io.milvus.v2.service.vector.response.InsertResp;
import io.milvus.v2.service.vector.response.SearchResp;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.core.service.ConfigService;
import org.ruoyi.domain.bo.QueryVectorBo;
@@ -10,10 +30,7 @@ import org.ruoyi.domain.bo.StoreEmbeddingBo;
import org.ruoyi.service.strategy.AbstractVectorStoreStrategy;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* Milvus向量库策略实现
@@ -24,8 +41,7 @@ import java.util.Map;
@Component
public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
// Milvus客户端和相关配置
// private MilvusClient milvusClient;
private MilvusClientV2 client;
public MilvusVectorStoreStrategy(ConfigService configService) {
super(configService);
@@ -41,106 +57,89 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
log.info("Milvus创建schema: vectorModelName={}, kid={}, modelName={}", vectorModelName, kid, modelName);
// 1. 获取Milvus配置
String host = configService.getConfigValue("milvus", "host");
String port = configService.getConfigValue("milvus", "port");
String host = configService.getConfigValue("milvus", "url");
String collectionName = configService.getConfigValue("milvus", "collectionname") + kid;
// 2. 初始化Milvus客户端
// ConnectParam connectParam = ConnectParam.newBuilder()
// .withHost(host)
// .withPort(Integer.parseInt(port))
// .build();
// milvusClient = new MilvusClient(connectParam);
ConnectConfig config = ConnectConfig.builder()
.uri(host)
.build();
client = new MilvusClientV2(config);
// 2. 检查集合是否存在
HasCollectionReq hasCollectionReq = HasCollectionReq.builder()
.collectionName(collectionName)
.build();
// 3. 检查集合是否存在,如果不存在则创建
// HasCollectionParam hasCollectionParam = HasCollectionParam.newBuilder()
// .withCollectionName(collectionName)
// .build();
// R<Boolean> hasCollectionResponse = milvusClient.hasCollection(hasCollectionParam);
//
// if (!hasCollectionResponse.getData()) {
// // 创建集合
// List<FieldType> fieldsSchema = new ArrayList<>();
//
// // 主键字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("id")
// .withDataType(DataType.Int64)
// .withPrimaryKey(true)
// .withAutoID(true)
// .build());
//
// // 文本字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("text")
// .withDataType(DataType.VarChar)
// .withMaxLength(65535)
// .build());
//
// // fid字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("fid")
// .withDataType(DataType.VarChar)
// .withMaxLength(255)
// .build());
//
// // kid字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("kid")
// .withDataType(DataType.VarChar)
// .withMaxLength(255)
// .build());
//
// // docId字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("docId")
// .withDataType(DataType.VarChar)
// .withMaxLength(255)
// .build());
//
// // 向量字段
// fieldsSchema.add(FieldType.newBuilder()
// .withName("vector")
// .withDataType(DataType.FloatVector)
// .withDimension(1536) // 根据实际embedding维度调整
// .build());
//
// CreateCollectionParam createCollectionParam = CreateCollectionParam.newBuilder()
// .withCollectionName(collectionName)
// .withDescription("Knowledge base collection for " + kid)
// .withShardsNum(2)
// .withFieldTypes(fieldsSchema)
// .build();
//
// R<RpcStatus> createCollectionResponse = milvusClient.createCollection(createCollectionParam);
// if (createCollectionResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus集合创建成功: {}", collectionName);
//
// // 创建索引
// IndexParam indexParam = IndexParam.newBuilder()
// .withCollectionName(collectionName)
// .withFieldName("vector")
// .withIndexType(IndexType.IVF_FLAT)
// .withMetricType(MetricType.L2)
// .withExtraParam("{\"nlist\":1024}")
// .build();
//
// R<RpcStatus> createIndexResponse = milvusClient.createIndex(indexParam);
// if (createIndexResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus索引创建成功: {}", collectionName);
// } else {
// log.error("Milvus索引创建失败: {}", createIndexResponse.getMessage());
// }
// } else {
// log.error("Milvus集合创建失败: {}", createCollectionResponse.getMessage());
// }
// }
Boolean hasCollection = client.hasCollection(hasCollectionReq);
log.info("Milvus schema创建完成: {}", collectionName);
if (!hasCollection) {
// 3. 创建集合schema
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.build();
// 添加字段定义
schema.addField(AddFieldReq.builder()
.fieldName("id")
.dataType(DataType.Int64)
.isPrimaryKey(true)
.autoID(true)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("text")
.dataType(DataType.VarChar)
.maxLength(65535)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("fid")
.dataType(DataType.VarChar)
.maxLength(255)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("kid")
.dataType(DataType.VarChar)
.maxLength(255)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("docId")
.dataType(DataType.VarChar)
.maxLength(255)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("vector")
.dataType(DataType.FloatVector)
.dimension(1024) // 根据实际embedding维度调整
.build());
// 4. 创建索引参数
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(IndexParam.builder()
.fieldName("vector")
.indexType(IndexParam.IndexType.IVF_FLAT)
.metricType(IndexParam.MetricType.L2)
.extraParams(Map.of("nlist", 1024))
.build());
// 5. 创建集合
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName(collectionName)
.collectionSchema(schema)
.indexParams(indexParams)
.build();
client.createCollection(createCollectionReq);
log.info("Milvus集合创建成功: {}", collectionName);
} else {
log.info("Milvus集合已存在: {}", collectionName);
}
}
@Override
public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) {
public void storeEmbeddings(StoreEmbeddingBo storeEmbeddingBo) throws ServiceException {
createSchema(storeEmbeddingBo.getVectorModelName(), storeEmbeddingBo.getKid(), storeEmbeddingBo.getVectorModelName());
EmbeddingModel embeddingModel = getEmbeddingModel(storeEmbeddingBo.getEmbeddingModelName(),
@@ -155,48 +154,56 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
log.info("Milvus向量存储条数记录: " + chunkList.size());
long startTime = System.currentTimeMillis();
// List<InsertParam.Field> fields = new ArrayList<>();
// List<String> textList = new ArrayList<>();
// List<String> fidListData = new ArrayList<>();
// List<String> kidList = new ArrayList<>();
// List<String> docIdList = new ArrayList<>();
// List<List<Float>> vectorList = new ArrayList<>();
// 准备批量插入数据
List<String> textList = new ArrayList<>();
List<String> fidListData = new ArrayList<>();
List<String> kidList = new ArrayList<>();
List<String> docIdList = new ArrayList<>();
List<List<Float>> vectorList = new ArrayList<>();
for (int i = 0; i < chunkList.size(); i++) {
String text = chunkList.get(i);
String fid = fidList.get(i);
Embedding embedding = embeddingModel.embed(text).content();
// textList.add(text);
// fidListData.add(fid);
// kidList.add(kid);
// docIdList.add(docId);
//
// List<Float> vector = new ArrayList<>();
// for (float f : embedding.vector()) {
// vector.add(f);
// }
// vectorList.add(vector);
textList.add(text);
fidListData.add(fid);
kidList.add(kid);
docIdList.add(docId);
List<Float> vector = new ArrayList<>();
for (float f : embedding.vector()) {
vector.add(f);
}
vectorList.add(vector);
}
// fields.add(new InsertParam.Field("text", textList));
// fields.add(new InsertParam.Field("fid", fidListData));
// fields.add(new InsertParam.Field("kid", kidList));
// fields.add(new InsertParam.Field("docId", docIdList));
// fields.add(new InsertParam.Field("vector", vectorList));
//
// InsertParam insertParam = InsertParam.newBuilder()
// .withCollectionName(collectionName)
// .withFields(fields)
// .build();
//
// R<MutationResult> insertResponse = milvusClient.insert(insertParam);
// if (insertResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus向量存储成功插入条数: {}", insertResponse.getData().getInsertCnt());
// } else {
// log.error("Milvus向量存储失败: {}", insertResponse.getMessage());
// throw new ServiceException("Milvus向量存储失败: " + insertResponse.getMessage());
// }
// 构建插入数据
List<JsonObject> data = new ArrayList<>();
Gson gson = new Gson();
for (int i = 0; i < textList.size(); i++) {
JsonObject row = new JsonObject();
row.addProperty("text", textList.get(i));
row.addProperty("fid", fidListData.get(i));
row.addProperty("kid", kidList.get(i));
row.addProperty("docId", docIdList.get(i));
row.add("vector", gson.toJsonTree(vectorList.get(i)));
data.add(row);
}
// 执行插入
InsertReq insertReq = InsertReq.builder()
.collectionName(collectionName)
.data(data)
.build();
InsertResp insertResp = client.insert(insertReq);
if (insertResp.getInsertCnt() > 0) {
log.info("Milvus向量存储成功插入条数: {}", insertResp.getInsertCnt());
} else {
log.error("Milvus向量存储失败");
throw new ServiceException("Milvus向量存储失败");
}
long endTime = System.currentTimeMillis();
log.info("Milvus向量存储完成消耗时间" + (endTime - startTime) / 1000 + "");
@@ -214,99 +221,95 @@ public class MilvusVectorStoreStrategy extends AbstractVectorStoreStrategy {
List<String> resultList = new ArrayList<>();
// List<String> searchOutputFields = List.of("text", "fid", "kid", "docId");
// List<List<Float>> searchVectors = new ArrayList<>();
// List<Float> queryVector = new ArrayList<>();
// for (float f : queryEmbedding.vector()) {
// queryVector.add(f);
// }
// searchVectors.add(queryVector);
//
// SearchParam searchParam = SearchParam.newBuilder()
// .withCollectionName(collectionName)
// .withMetricType(MetricType.L2)
// .withOutFields(searchOutputFields)
// .withTopK(queryVectorBo.getMaxResults())
// .withVectors(searchVectors)
// .withVectorFieldName("vector")
// .withParams("{\"nprobe\":10}")
// .build();
//
// R<SearchResults> searchResponse = milvusClient.search(searchParam);
// if (searchResponse.getStatus() == R.Status.Success.getCode()) {
// SearchResults searchResults = searchResponse.getData();
// List<SearchResults.QueryResult> queryResults = searchResults.getResults();
//
// for (SearchResults.QueryResult queryResult : queryResults) {
// List<SearchResults.QueryResult.Row> rows = queryResult.getRows();
// for (SearchResults.QueryResult.Row row : rows) {
// String text = (String) row.get("text");
// resultList.add(text);
// }
// }
// } else {
// log.error("Milvus查询失败: {}", searchResponse.getMessage());
// }
// 准备查询向量
List<BaseVector> searchVectors = new ArrayList<>();
float[] queryVectorArray = new float[queryEmbedding.vector().length];
for (int i = 0; i < queryEmbedding.vector().length; i++) {
queryVectorArray[i] = queryEmbedding.vector()[i];
}
searchVectors.add(new FloatVec(queryVectorArray));
// 构建搜索请求
SearchReq searchReq = SearchReq.builder()
.collectionName(collectionName)
.data(searchVectors)
.topK(queryVectorBo.getMaxResults())
.outputFields(Arrays.asList("text", "fid", "kid", "docId"))
.build();
SearchResp searchResp = client.search(searchReq);
if (searchResp != null && searchResp.getSearchResults() != null) {
List<List<SearchResp.SearchResult>> searchResults = searchResp.getSearchResults();
for (List<SearchResp.SearchResult> results : searchResults) {
for (SearchResp.SearchResult result : results) {
Map<String, Object> entity = result.getEntity();
String text = (String) entity.get("text");
if (text != null) {
resultList.add(text);
}
}
}
} else {
log.error("Milvus查询失败或无结果");
}
return resultList;
}
@Override
public void removeById(String id, String modelName) {
public void removeById(String id, String modelName) throws ServiceException {
String collectionName = configService.getConfigValue("milvus", "collectionname") + id;
// DropCollectionParam dropCollectionParam = DropCollectionParam.newBuilder()
// .withCollectionName(collectionName)
// .build();
//
// R<RpcStatus> dropResponse = milvusClient.dropCollection(dropCollectionParam);
// if (dropResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus集合删除成功: {}", collectionName);
// } else {
// log.error("Milvus集合删除失败: {}", dropResponse.getMessage());
// throw new ServiceException("Milvus集合删除失败: " + dropResponse.getMessage());
// }
// 删除整个集合
DropCollectionReq dropCollectionReq = DropCollectionReq.builder()
.collectionName(collectionName)
.build();
log.info("Milvus删除集合: {}", collectionName);
try {
client.dropCollection(dropCollectionReq);
log.info("Milvus集合删除成功: {}", collectionName);
} catch (Exception e) {
log.error("Milvus集合删除失败: {}", e.getMessage());
throw new ServiceException("Milvus集合删除失败: " + e.getMessage());
}
}
@Override
public void removeByDocId(String docId, String kid) {
public void removeByDocId(String docId, String kid) throws ServiceException {
String collectionName = configService.getConfigValue("milvus", "collectionname") + kid;
// String expr = "docId == \"" + docId + "\"";
// DeleteParam deleteParam = DeleteParam.newBuilder()
// .withCollectionName(collectionName)
// .withExpr(expr)
// .build();
//
// R<MutationResult> deleteResponse = milvusClient.delete(deleteParam);
// if (deleteResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus成功删除 docId={} 的所有向量数据,删除条数: {}", docId, deleteResponse.getData().getDeleteCnt());
// } else {
// log.error("Milvus删除失败: {}", deleteResponse.getMessage());
// }
String expr = "docId == \"" + docId + "\"";
DeleteReq deleteReq = DeleteReq.builder()
.collectionName(collectionName)
.filter(expr)
.build();
log.info("Milvus删除docId={}的数据", docId);
try {
DeleteResp deleteResp = client.delete(deleteReq);
log.info("Milvus成功删除 docId={} 的所有向量数据,删除条数: {}", docId, deleteResp.getDeleteCnt());
} catch (Exception e) {
log.error("Milvus删除失败: {}", e.getMessage());
throw new ServiceException(e.getMessage());
}
}
@Override
public void removeByFid(String fid, String kid) {
public void removeByFid(String fid, String kid) throws ServiceException {
String collectionName = configService.getConfigValue("milvus", "collectionname") + kid;
// String expr = "fid == \"" + fid + "\"";
// DeleteParam deleteParam = DeleteParam.newBuilder()
// .withCollectionName(collectionName)
// .withExpr(expr)
// .build();
//
// R<MutationResult> deleteResponse = milvusClient.delete(deleteParam);
// if (deleteResponse.getStatus() == R.Status.Success.getCode()) {
// log.info("Milvus成功删除 fid={} 的所有向量数据,删除条数: {}", fid, deleteResponse.getData().getDeleteCnt());
// } else {
// log.error("Milvus删除失败: {}", deleteResponse.getMessage());
// }
String expr = "fid == \"" + fid + "\"";
DeleteReq deleteReq = DeleteReq.builder()
.collectionName(collectionName)
.filter(expr)
.build();
log.info("Milvus删除fid={}的数据", fid);
try {
DeleteResp deleteResp = client.delete(deleteReq);
log.info("Milvus成功删除 fid={} 的所有向量数据,删除条数: {}", fid, deleteResp.getDeleteCnt());
} catch (Exception e) {
log.error("Milvus删除失败: {}", e.getMessage());
throw new ServiceException(e.getMessage());
}
}
}