feat(knowledge): 优化知识库文件状态枚举为"未解析,解析中,解析成功,解析失败",支持异步线程池解析文档

This commit is contained in:
RobustH
2026-04-13 00:15:01 +08:00
parent 28ad29d6ed
commit 0fa25032a3
10 changed files with 190 additions and 117 deletions

View File

@@ -10,6 +10,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.VirtualThreadTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
/**
@@ -22,6 +23,12 @@ import java.util.concurrent.*;
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {
private final ThreadPoolProperties properties;
public ThreadPoolConfig(ThreadPoolProperties properties) {
this.properties = properties;
}
/**
* 核心线程数 = cpu 核心数 + 1
*/
@@ -54,6 +61,22 @@ public class ThreadPoolConfig {
return scheduledThreadPoolExecutor;
}
/**
* 知识库解析专用异步线程池
*/
@Bean(name = "knowledgeParseExecutor")
public ThreadPoolTaskExecutor knowledgeParseExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(core);
executor.setMaxPoolSize(core * 2);
executor.setQueueCapacity(properties.getQueueCapacity());
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
executor.setThreadNamePrefix("knowledge-parse-pool-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 销毁事件
* 停止线程池

View File

@@ -110,6 +110,17 @@ public class KnowledgeAttachController extends BaseController {
@PostMapping(value = "/upload")
public R<String> upload(KnowledgeInfoUploadBo bo){
knowledgeAttachService.upload(bo);
return R.ok("上传知识库附件成功!");
return R.ok("上传成功!");
}
/**
* 手动解析附件内容
*
* @param id 附件ID
*/
@PostMapping("/parse/{id}")
public R<Void> parse(@PathVariable Long id) {
knowledgeAttachService.parse(id);
return R.ok();
}
}

View File

@@ -16,6 +16,11 @@ public class KnowledgeInfoUploadBo {
private MultipartFile file;
/**
* 是否自动解析 (true: 立即解析, false: 仅上传)
*/
private Boolean autoParse;
/**
* 生效时间, 为空则立即生效
*/

View File

@@ -57,5 +57,10 @@ public class KnowledgeAttach extends BaseEntity {
*/
private String remark;
/**
* 解析状态: 0待解析, 1解析中, 2已解析, 3解析失败
*/
private Integer status;
}

View File

@@ -75,6 +75,12 @@ public class KnowledgeAttachVo implements Serializable {
@ExcelProperty(value = "上传时间")
private Date createTime;
/**
* 解析状态: 0待解析, 1解析中, 2已解析, 3解析失败
*/
@ExcelProperty(value = "解析状态")
private Integer status;
/**
* 分块数(统计字段,非数据库列)
*/

View File

@@ -0,0 +1,38 @@
package org.ruoyi.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 知识库附件解析状态枚举
*
* @author RobustH
*/
@Getter
@AllArgsConstructor
public enum KnowledgeAttachStatus {
/**
* 待解析
*/
WAITING(0, "待解析"),
/**
* 解析中
*/
PARSING(1, "解析中"),
/**
* 已解析
*/
COMPLETED(2, "已解析"),
/**
* 解析失败
*/
FAILED(3, "解析失败");
private final Integer code;
private final String info;
}

View File

@@ -1,5 +1,6 @@
package org.ruoyi.mapper.knowledge;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.ruoyi.domain.entity.knowledge.KnowledgeAttach;
@@ -12,6 +13,7 @@ import org.ruoyi.common.mybatis.core.mapper.BaseMapperPlus;
* @author ageerle
* @date 2025-12-17
*/
@Mapper
public interface KnowledgeAttachMapper extends BaseMapperPlus<KnowledgeAttach, KnowledgeAttachVo> {
/**

View File

@@ -1,5 +1,6 @@
package org.ruoyi.mapper.knowledge;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.ruoyi.domain.entity.knowledge.KnowledgeFragment;
@@ -15,6 +16,7 @@ import java.util.List;
* @author ageerle
* @date 2025-12-17
*/
@Mapper
public interface KnowledgeFragmentMapper extends BaseMapperPlus<KnowledgeFragment, KnowledgeFragmentVo> {
/**

View File

@@ -72,4 +72,11 @@ public interface IKnowledgeAttachService {
* 上传附件
*/
void upload(KnowledgeInfoUploadBo bo);
/**
* 解析附件知识片段
*
* @param id 附件ID
*/
void parse(Long id);
}

View File

@@ -2,19 +2,21 @@ package org.ruoyi.service.knowledge.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import org.ruoyi.common.chat.service.chat.IChatModelService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
import org.ruoyi.common.chat.service.chat.IChatModelService;
import org.ruoyi.enums.KnowledgeAttachStatus;
import org.ruoyi.common.core.domain.dto.OssDTO;
import org.ruoyi.common.core.service.OssService;
import org.ruoyi.common.core.utils.MapstructUtils;
import org.ruoyi.common.core.utils.SpringUtils;
import org.ruoyi.common.core.utils.StringUtils;
import org.ruoyi.common.mybatis.core.page.TableDataInfo;
import org.ruoyi.common.mybatis.core.page.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.mybatis.core.page.TableDataInfo;
import org.ruoyi.domain.bo.knowledge.KnowledgeAttachBo;
import org.ruoyi.domain.bo.knowledge.KnowledgeInfoUploadBo;
import org.ruoyi.domain.bo.vector.StoreEmbeddingBo;
@@ -30,11 +32,15 @@ import org.ruoyi.service.knowledge.IKnowledgeAttachService;
import org.ruoyi.service.knowledge.IKnowledgeInfoService;
import org.ruoyi.service.knowledge.ResourceLoader;
import org.ruoyi.service.vector.VectorStoreService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
/**
* 知识库附件Service业务层处理
@@ -48,52 +54,26 @@ import java.util.*;
public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
private final KnowledgeAttachMapper baseMapper;
private final IKnowledgeInfoService knowledgeInfoService;
private final KnowledgeFragmentMapper knowledgeFragmentMapper;
private final IChatModelService chatModelService;
private final ResourceLoaderFactory resourceLoaderFactory;
private final VectorStoreService vectorStoreService;
private final OssService ossService;
/**
* 查询知识库附件
*
* @param id 主键
* @return 知识库附件
*/
@Override
public KnowledgeAttachVo queryById(Long id) {
return baseMapper.selectVoById(id);
}
/**
* 分页查询知识库附件列表
*
* @param bo 查询条件
* @param pageQuery 分页参数
* @return 知识库附件分页列表
*/
@Override
public TableDataInfo<KnowledgeAttachVo> queryPageList(KnowledgeAttachBo bo, PageQuery pageQuery) {
LambdaQueryWrapper<KnowledgeAttach> lqw = buildQueryWrapper(bo);
Page<KnowledgeAttachVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
// 批量填充分块数
List<KnowledgeAttachVo> records = result.getRecords();
fillFragmentCount(records);
fillFragmentCount(result.getRecords());
return TableDataInfo.build(result);
}
/**
* 查询符合条件的知识库附件列表
*
* @param bo 查询条件
* @return 知识库附件列表
*/
@Override
public List<KnowledgeAttachVo> queryList(KnowledgeAttachBo bo) {
LambdaQueryWrapper<KnowledgeAttach> lqw = buildQueryWrapper(bo);
@@ -102,32 +82,23 @@ public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
return list;
}
/**
* 批量填充每个附件记录的分块数fragmentCount
*/
private void fillFragmentCount(List<KnowledgeAttachVo> records) {
if (records == null || records.isEmpty()) return;
List<String> docIds = records.stream()
.map(KnowledgeAttachVo::getDocId)
.filter(docId -> docId != null && !docId.isEmpty())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(java.util.stream.Collectors.toList());
.collect(Collectors.toList());
if (docIds.isEmpty()) return;
List<DocFragmentCountVo> countList =
knowledgeFragmentMapper.selectFragmentCountByDocIds(docIds);
Map<String, Integer> countMap = new java.util.HashMap<>();
for (DocFragmentCountVo item : countList) {
if (item.getDocId() != null) {
countMap.put(item.getDocId(), item.getFragmentCount());
}
}
List<DocFragmentCountVo> countList = knowledgeFragmentMapper.selectFragmentCountByDocIds(docIds);
Map<String, Integer> countMap = countList.stream()
.collect(Collectors.toMap(DocFragmentCountVo::getDocId, DocFragmentCountVo::getFragmentCount, (k1, k2) -> k1));
for (KnowledgeAttachVo vo : records) {
vo.setFragmentCount(countMap.getOrDefault(vo.getDocId(), 0));
}
}
private LambdaQueryWrapper<KnowledgeAttach> buildQueryWrapper(KnowledgeAttachBo bo) {
Map<String, Object> params = bo.getParams();
LambdaQueryWrapper<KnowledgeAttach> lqw = Wrappers.lambdaQuery();
lqw.orderByAsc(KnowledgeAttach::getId);
lqw.eq(bo.getKnowledgeId() != null, KnowledgeAttach::getKnowledgeId, bo.getKnowledgeId());
@@ -137,16 +108,9 @@ public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
return lqw;
}
/**
* 新增知识库附件
*
* @param bo 知识库附件
* @return 是否新增成功
*/
@Override
public Boolean insertByBo(KnowledgeAttachBo bo) {
KnowledgeAttach add = MapstructUtils.convert(bo, KnowledgeAttach.class);
validEntityBeforeSave(add);
boolean flag = baseMapper.insert(add) > 0;
if (flag) {
bo.setId(add.getId());
@@ -154,66 +118,72 @@ public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
return flag;
}
/**
* 修改知识库附件
*
* @param bo 知识库附件
* @return 是否修改成功
*/
@Override
public Boolean updateByBo(KnowledgeAttachBo bo) {
KnowledgeAttach update = MapstructUtils.convert(bo, KnowledgeAttach.class);
validEntityBeforeSave(update);
return baseMapper.updateById(update) > 0;
}
/**
* 保存前的数据校验
*/
private void validEntityBeforeSave(KnowledgeAttach entity){
//TODO 做一些数据校验,如唯一约束
}
/**
* 校验并批量删除知识库附件信息
*
* @param ids 待删除的主键集合
* @param isValid 是否进行有效性校验
* @return 是否删除成功
*/
@Override
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
}
return baseMapper.deleteByIds(ids) > 0;
}
@Override
public void upload(KnowledgeInfoUploadBo bo) {
MultipartFile file = bo.getFile();
// 保存文件信息
OssDTO ossDTO = ossService.uploadFile(file);
Long knowledgeId = bo.getKnowledgeId();
List<String> chunkList = new ArrayList<>();
KnowledgeAttach knowledgeAttach = new KnowledgeAttach();
knowledgeAttach.setKnowledgeId(bo.getKnowledgeId());
String docId = RandomUtil.randomString(10);
knowledgeAttach.setOssId(ossDTO.getOssId());
knowledgeAttach.setDocId(docId);
knowledgeAttach.setDocId(RandomUtil.randomString(10));
knowledgeAttach.setName(ossDTO.getOriginalName());
knowledgeAttach.setType(ossDTO.getFileSuffix());
String content = "";
ResourceLoader resourceLoader = resourceLoaderFactory.getLoaderByFileType(knowledgeAttach.getType());
// 文档分段入库
List<String> fids = new ArrayList<>();
knowledgeAttach.setStatus(KnowledgeAttachStatus.WAITING.getCode()); // 待解析
baseMapper.insert(knowledgeAttach);
if (Boolean.TRUE.equals(bo.getAutoParse())) {
// 通过 SpringUtils 获取代理对象,确保 @Async 生效
SpringUtils.getBean(IKnowledgeAttachService.class).parse(knowledgeAttach.getId());
}
}
@Async("knowledgeParseExecutor")
@Override
public void parse(Long id) {
KnowledgeAttach attach = baseMapper.selectById(id);
if (attach == null || (!KnowledgeAttachStatus.WAITING.getCode().equals(attach.getStatus()) && !KnowledgeAttachStatus.FAILED.getCode().equals(attach.getStatus()))) {
return;
}
try {
content = resourceLoader.getContent(file.getInputStream());
chunkList = resourceLoader.getChunkList(content, String.valueOf(knowledgeId));
attach.setStatus(KnowledgeAttachStatus.PARSING.getCode()); // 解析中
baseMapper.updateById(attach);
log.info("开始解析知识库文档... id: {}, docId: {}", id, attach.getDocId());
Long knowledgeId = attach.getKnowledgeId();
String docId = attach.getDocId();
// 获取文件信息并下载
List<OssDTO> ossDTOs = ossService.selectByIds(String.valueOf(attach.getOssId()));
if (ossDTOs == null || ossDTOs.isEmpty()) {
throw new RuntimeException("未找到对应的 OSS 文件信息");
}
OssDTO ossDTO = ossDTOs.get(0);
String content;
ResourceLoader resourceLoader = resourceLoaderFactory.getLoaderByFileType(attach.getType());
try (InputStream inputStream = new URL(ossDTO.getUrl()).openStream()) {
content = resourceLoader.getContent(inputStream);
}
List<String> chunkList = resourceLoader.getChunkList(content, String.valueOf(knowledgeId));
List<String> fids = new ArrayList<>();
List<KnowledgeFragment> knowledgeFragmentList = new ArrayList<>();
if (CollUtil.isNotEmpty(chunkList)) {
for (int i = 0; i < chunkList.size(); i++) {
// 生成知识片段ID
String fid = RandomUtil.randomString(10);
fids.add(fid);
KnowledgeFragment knowledgeFragment = new KnowledgeFragment();
@@ -223,17 +193,12 @@ public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
knowledgeFragment.setCreateTime(new Date());
knowledgeFragmentList.add(knowledgeFragment);
}
}
knowledgeFragmentMapper.delete(Wrappers.<KnowledgeFragment>lambdaQuery().eq(KnowledgeFragment::getDocId, docId));
knowledgeFragmentMapper.insertBatch(knowledgeFragmentList);
} catch (IOException e) {
log.error("保存知识库信息失败!{}", e.getMessage());
log.info("文档切片并入库完成,共计 {} 个片段。id: {}", chunkList.size(), id);
}
baseMapper.insert(knowledgeAttach);
// 查询知识库信息
KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(knowledgeId);
// 查询向量模信息
ChatModelVo chatModelVo = chatModelService.selectModelByName(knowledgeInfoVo.getEmbeddingModel());
StoreEmbeddingBo storeEmbeddingBo = new StoreEmbeddingBo();
@@ -246,6 +211,15 @@ public class KnowledgeAttachServiceImpl implements IKnowledgeAttachService {
storeEmbeddingBo.setApiKey(chatModelVo.getApiKey());
storeEmbeddingBo.setBaseUrl(chatModelVo.getApiHost());
vectorStoreService.storeEmbeddings(storeEmbeddingBo);
}
attach.setStatus(KnowledgeAttachStatus.COMPLETED.getCode()); // 已完成
baseMapper.updateById(attach);
log.info("知识库文档解析、向量化并入库成功id: {}", id);
} catch (Exception e) {
log.error("解析文档失败id: {}, error: {}", id, e.getMessage(), e);
attach.setStatus(KnowledgeAttachStatus.FAILED.getCode()); // 失败
attach.setRemark(StringUtils.substring(e.getMessage(), 0, 255)); // 保存错误原因,截取防止溢出
baseMapper.updateById(attach);
}
}
}