mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-04-11 10:37:20 +00:00
refactor(chat): 重构聊天服务架构,引入Handler模式
主要变更: 1. 移除ruoyi-ai-copilot模块 2. 重构docker配置目录结构,统一迁移至docs/docker/ 3. 聊天服务引入Handler模式: - 新增ChatHandler接口及多种实现 - DefaultChatHandler: 默认聊天处理 - AgentChatHandler: Agent模式处理 - WorkflowChatHandler: 工作流处理 - ResumeChatHandler: 恢复会话处理 - ChatContextBuilder: 上下文构建器 4. 简化AbstractStreamingChatService和ChatServiceFacade代码 5. 优化各Provider实现,统一代码风格 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,159 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import dev.langchain4j.agentic.AgenticServices;
|
||||
import dev.langchain4j.community.model.dashscope.QwenChatModel;
|
||||
import dev.langchain4j.service.tool.ToolProvider;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.agent.McpAgent;
|
||||
import org.ruoyi.common.chat.domain.bo.chat.ChatMessageBo;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.enums.RoleType;
|
||||
import org.ruoyi.common.chat.service.chatMessage.IChatMessageService;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
import org.ruoyi.mcp.service.core.ToolProviderFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Agent 深度思考处理器
|
||||
* <p>
|
||||
* 处理 enableThinking=true 的场景,使用 Agent 进行深度思考和工具调用
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(3)
|
||||
@RequiredArgsConstructor
|
||||
public class AgentChatHandler implements ChatHandler {
|
||||
|
||||
private final ToolProviderFactory toolProviderFactory;
|
||||
private final IChatMessageService chatMessageService;
|
||||
|
||||
@Override
|
||||
public boolean supports(ChatContext context) {
|
||||
Boolean enableThinking = context.getChatRequest().getEnableThinking();
|
||||
return enableThinking != null && enableThinking;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SseEmitter handle(ChatContext context) {
|
||||
log.info("处理 Agent 深度思考,用户: {}", context.getUserId());
|
||||
|
||||
Long userId = context.getUserId();
|
||||
String tokenValue = context.getTokenValue();
|
||||
ChatModelVo chatModelVo = context.getChatModelVo();
|
||||
|
||||
try {
|
||||
// 1. 保存用户消息
|
||||
String content = extractUserContent(context);
|
||||
saveChatMessage(context.getChatRequest(), userId, content,
|
||||
RoleType.USER.getName(), chatModelVo);
|
||||
|
||||
// 2. 执行 Agent 任务
|
||||
String result = doAgent(content, chatModelVo);
|
||||
|
||||
// 3. 发送结果并保存
|
||||
SseMessageUtils.sendMessage(userId, result);
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
saveChatMessage(context.getChatRequest(), userId, result,
|
||||
RoleType.ASSISTANT.getName(), chatModelVo);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Agent 执行失败: {}", e.getMessage(), e);
|
||||
SseMessageUtils.sendMessage(userId, "Agent 执行失败:" + e.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
|
||||
return context.getEmitter();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行 Agent 任务
|
||||
*/
|
||||
private String doAgent(String userMessage, ChatModelVo chatModelVo) {
|
||||
log.info("执行 Agent 任务,消息: {}", userMessage);
|
||||
|
||||
try {
|
||||
// 1. 加载 LLM 模型
|
||||
QwenChatModel qwenChatModel = QwenChatModel.builder()
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
.modelName(chatModelVo.getModelName())
|
||||
.build();
|
||||
|
||||
// 2. 获取内置工具
|
||||
List<Object> builtinTools = toolProviderFactory.getAllBuiltinToolObjects();
|
||||
List<Object> allTools = new ArrayList<>(builtinTools);
|
||||
log.debug("加载 {} 个内置工具", builtinTools.size());
|
||||
|
||||
// 3. 获取 MCP 工具提供者
|
||||
ToolProvider mcpToolProvider = toolProviderFactory.getAllEnabledMcpToolsProvider();
|
||||
|
||||
// 4. 创建 MCP Agent
|
||||
var agentBuilder = AgenticServices.agentBuilder(McpAgent.class)
|
||||
.chatModel(qwenChatModel);
|
||||
|
||||
if (!allTools.isEmpty()) {
|
||||
agentBuilder.tools(allTools.toArray(new Object[0]));
|
||||
}
|
||||
if (mcpToolProvider != null) {
|
||||
agentBuilder.toolProvider(mcpToolProvider);
|
||||
}
|
||||
|
||||
McpAgent mcpAgent = agentBuilder.build();
|
||||
|
||||
// 5. 调用 Agent
|
||||
String result = mcpAgent.callMcpTool(userMessage);
|
||||
log.info("Agent 执行完成,结果长度: {}", result.length());
|
||||
return result;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Agent 模式执行失败: {}", e.getMessage(), e);
|
||||
return "Agent 执行失败: " + e.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取用户消息内容
|
||||
*/
|
||||
private String extractUserContent(ChatContext context) {
|
||||
var messages = context.getChatRequest().getMessages();
|
||||
if (messages != null && !messages.isEmpty()) {
|
||||
return messages.get(0).getContent();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存聊天消息
|
||||
*/
|
||||
private void saveChatMessage(org.ruoyi.common.chat.domain.dto.request.ChatRequest chatRequest,
|
||||
Long userId, String content, String role, ChatModelVo chatModelVo) {
|
||||
try {
|
||||
if (chatRequest == null || userId == null) {
|
||||
log.warn("缺少必要的聊天上下文信息,无法保存消息");
|
||||
return;
|
||||
}
|
||||
|
||||
ChatMessageBo messageBO = new ChatMessageBo();
|
||||
messageBO.setUserId(userId);
|
||||
messageBO.setSessionId(chatRequest.getSessionId());
|
||||
messageBO.setContent(content);
|
||||
messageBO.setRole(role);
|
||||
messageBO.setModelName(chatRequest.getModel());
|
||||
messageBO.setBillingType(chatModelVo.getModelType());
|
||||
messageBO.setRemark(null);
|
||||
|
||||
chatMessageService.insertByBo(messageBO);
|
||||
} catch (Exception e) {
|
||||
log.error("保存聊天消息时出错: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,136 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.ChatMessageDTO;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.factory.ChatServiceFactory;
|
||||
import org.ruoyi.common.chat.service.chat.IChatModelService;
|
||||
import org.ruoyi.common.chat.service.chat.IChatService;
|
||||
import org.ruoyi.common.satoken.utils.LoginHelper;
|
||||
import org.ruoyi.common.sse.core.SseEmitterManager;
|
||||
import org.ruoyi.domain.bo.vector.QueryVectorBo;
|
||||
import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo;
|
||||
import org.ruoyi.service.knowledge.IKnowledgeInfoService;
|
||||
import org.ruoyi.service.vector.VectorStoreService;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 对话上下文构建器
|
||||
* <p>
|
||||
* 负责构建完整的对话上下文,包括:
|
||||
* 1. 模型配置查询
|
||||
* 2. 知识库检索增强
|
||||
* 3. SSE连接创建
|
||||
* 4. 用户信息注入
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ChatContextBuilder {
|
||||
|
||||
private final IChatModelService chatModelService;
|
||||
private final IKnowledgeInfoService knowledgeInfoService;
|
||||
private final VectorStoreService vectorStoreService;
|
||||
private final SseEmitterManager sseEmitterManager;
|
||||
private final ChatServiceFactory chatServiceFactory;
|
||||
|
||||
/**
|
||||
* 构建对话上下文
|
||||
*
|
||||
* @param chatRequest 对话请求
|
||||
* @return 完整的对话上下文
|
||||
*/
|
||||
public ChatContext build(ChatRequest chatRequest) {
|
||||
// 1. 查询模型配置
|
||||
ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel());
|
||||
if (chatModelVo == null) {
|
||||
throw new IllegalArgumentException("模型不存在: " + chatRequest.getModel());
|
||||
}
|
||||
|
||||
// 2. 构建上下文消息(知识库增强)
|
||||
List<ChatMessageDTO> contextMessages = buildContextMessages(chatRequest);
|
||||
chatRequest.setMessages(contextMessages);
|
||||
|
||||
// 3. 获取用户信息
|
||||
Long userId = LoginHelper.getUserId();
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
|
||||
// 4. 创建SSE连接
|
||||
SseEmitter emitter = sseEmitterManager.connect(userId, tokenValue);
|
||||
|
||||
// 5. 获取服务提供商
|
||||
String category = chatModelVo.getProviderCode();
|
||||
IChatService chatService = chatServiceFactory.getOriginalService(category);
|
||||
log.info("路由到服务提供商: {}, 模型: {}", category, chatRequest.getModel());
|
||||
|
||||
// 6. 构建上下文对象
|
||||
return ChatContext.builder()
|
||||
.chatModelVo(chatModelVo)
|
||||
.chatRequest(chatRequest)
|
||||
.emitter(emitter)
|
||||
.userId(userId)
|
||||
.tokenValue(tokenValue)
|
||||
.chatService(chatService)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建上下文消息列表(知识库增强)
|
||||
*/
|
||||
private List<ChatMessageDTO> buildContextMessages(ChatRequest chatRequest) {
|
||||
List<ChatMessageDTO> messages = chatRequest.getMessages();
|
||||
|
||||
// 从向量库查询相关历史消息
|
||||
if (chatRequest.getKnowledgeId() != null) {
|
||||
KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKnowledgeId()));
|
||||
if (knowledgeInfoVo == null) {
|
||||
log.warn("知识库信息不存在,kid: {}", chatRequest.getKnowledgeId());
|
||||
return messages;
|
||||
}
|
||||
|
||||
// 查询向量模型配置
|
||||
ChatModelVo chatModel = chatModelService.selectModelByName(knowledgeInfoVo.getEmbeddingModel());
|
||||
if (chatModel == null) {
|
||||
log.warn("向量模型配置不存在,模型名称: {}", knowledgeInfoVo.getEmbeddingModel());
|
||||
return messages;
|
||||
}
|
||||
|
||||
// 构建向量查询参数并检索
|
||||
QueryVectorBo queryVectorBo = buildQueryVectorBo(chatRequest, knowledgeInfoVo, chatModel);
|
||||
List<String> nearestList = vectorStoreService.getQueryVector(queryVectorBo);
|
||||
|
||||
// 知识库内容作为系统上下文添加
|
||||
for (String prompt : nearestList) {
|
||||
messages.add(ChatMessageDTO.system(prompt));
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建向量查询参数
|
||||
*/
|
||||
private QueryVectorBo buildQueryVectorBo(ChatRequest chatRequest, KnowledgeInfoVo knowledgeInfoVo,
|
||||
ChatModelVo chatModel) {
|
||||
QueryVectorBo queryVectorBo = new QueryVectorBo();
|
||||
queryVectorBo.setQuery(chatRequest.getMessages().get(0).getContent());
|
||||
queryVectorBo.setKid(chatRequest.getKnowledgeId());
|
||||
queryVectorBo.setApiKey(chatModel.getApiKey());
|
||||
queryVectorBo.setBaseUrl(chatModel.getApiHost());
|
||||
queryVectorBo.setVectorModelName(knowledgeInfoVo.getVectorModel());
|
||||
queryVectorBo.setEmbeddingModelName(knowledgeInfoVo.getEmbeddingModel());
|
||||
queryVectorBo.setMaxResults(knowledgeInfoVo.getRetrieveLimit());
|
||||
return queryVectorBo;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
/**
|
||||
* 对话处理器接口
|
||||
* <p>
|
||||
* 使用策略模式,每种对话场景独立实现
|
||||
* 通过 Order 注解控制优先级
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
public interface ChatHandler {
|
||||
|
||||
/**
|
||||
* 是否支持处理该请求
|
||||
*
|
||||
* @param context 对话上下文
|
||||
* @return true-支持处理,false-不支持
|
||||
*/
|
||||
boolean supports(ChatContext context);
|
||||
|
||||
/**
|
||||
* 处理对话
|
||||
*
|
||||
* @param context 对话上下文
|
||||
* @return SSE发射器
|
||||
*/
|
||||
SseEmitter handle(ChatContext context);
|
||||
|
||||
/**
|
||||
* 优先级(越小越优先)
|
||||
* 默认 100,数字越小优先级越高
|
||||
*
|
||||
* @return 优先级数值
|
||||
*/
|
||||
default int getOrder() {
|
||||
return 100;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,247 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import dev.langchain4j.data.message.ChatMessage;
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.enums.RoleType;
|
||||
import org.ruoyi.common.core.utils.StringUtils;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
import org.ruoyi.service.chat.impl.AbstractStreamingChatService;
|
||||
import org.ruoyi.service.chat.impl.memory.PersistentChatMemoryStore;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 默认对话处理器
|
||||
* <p>
|
||||
* 处理普通对话场景,包含:
|
||||
* 1. 历史记忆管理
|
||||
* 2. 消息保存
|
||||
* 3. 流式对话响应
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(100)
|
||||
public class DefaultChatHandler implements ChatHandler {
|
||||
|
||||
private final Map<String, AbstractStreamingChatService> chatServiceMap;
|
||||
|
||||
/**
|
||||
* 默认保留的消息窗口大小
|
||||
*/
|
||||
private static final int DEFAULT_MAX_MESSAGES = 20;
|
||||
|
||||
/**
|
||||
* 是否启用长期记忆
|
||||
*/
|
||||
private static final boolean ENABLE_PERSISTENT_MEMORY = true;
|
||||
|
||||
/**
|
||||
* 内存实例缓存
|
||||
*/
|
||||
private static final Map<Object, MessageWindowChatMemory> MEMORY_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 构造函数,注入所有聊天服务实现
|
||||
*/
|
||||
public DefaultChatHandler(List<AbstractStreamingChatService> chatServices) {
|
||||
this.chatServiceMap = chatServices.stream()
|
||||
.collect(Collectors.toMap(
|
||||
AbstractStreamingChatService::getProviderName,
|
||||
Function.identity()
|
||||
));
|
||||
log.info("已加载 {} 个聊天服务: {}", chatServiceMap.size(), chatServiceMap.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 providerCode 获取对应的聊天服务
|
||||
*/
|
||||
private AbstractStreamingChatService getChatService(String providerCode) {
|
||||
if (StringUtils.isBlank(providerCode)) {
|
||||
// 默认使用千问服务
|
||||
return chatServiceMap.get("qianwen");
|
||||
}
|
||||
AbstractStreamingChatService service = chatServiceMap.get(providerCode.toLowerCase());
|
||||
if (service == null) {
|
||||
log.warn("未找到提供商 {} 对应的服务,使用默认千问服务", providerCode);
|
||||
return chatServiceMap.get("qianwen");
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(ChatContext context) {
|
||||
// 默认处理器,始终支持
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SseEmitter handle(ChatContext context) {
|
||||
log.info("处理默认对话,用户: {}, 会话: {}",
|
||||
context.getUserId(), context.getChatRequest().getSessionId());
|
||||
|
||||
Long userId = context.getUserId();
|
||||
String tokenValue = context.getTokenValue();
|
||||
|
||||
// 根据 providerCode 获取对应的聊天服务
|
||||
String providerCode = context.getChatModelVo().getProviderCode();
|
||||
AbstractStreamingChatService chatService = getChatService(providerCode);
|
||||
log.info("使用服务提供商: {}", chatService.getProviderName());
|
||||
|
||||
try {
|
||||
// 1. 提取用户消息内容
|
||||
String content = extractUserContent(context);
|
||||
|
||||
// 2. 保存用户消息
|
||||
chatService.saveChatMessage(context.getChatRequest(), userId, content,
|
||||
RoleType.USER.getName(), context.getChatModelVo());
|
||||
|
||||
// 3. 构建包含历史记忆的消息列表
|
||||
List<ChatMessage> messagesWithMemory = buildMessagesWithMemory(context.getChatRequest());
|
||||
|
||||
// 4. 创建响应处理器
|
||||
StreamingChatResponseHandler handler = createResponseHandler(
|
||||
context.getChatRequest(), userId, tokenValue, context.getChatModelVo(), chatService);
|
||||
|
||||
// 5. 构建流式模型并执行对话
|
||||
StreamingChatModel streamingModel = chatService.buildStreamingChatModel(
|
||||
context.getChatModelVo(), context.getChatRequest());
|
||||
streamingModel.chat(messagesWithMemory, handler);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("对话处理失败: {}", e.getMessage(), e);
|
||||
SseMessageUtils.sendMessage(userId, "对话出错:" + e.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
|
||||
return context.getEmitter();
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取用户消息内容
|
||||
*/
|
||||
private String extractUserContent(ChatContext context) {
|
||||
return Optional.ofNullable(context.getChatRequest().getMessages())
|
||||
.filter(messages -> !messages.isEmpty())
|
||||
.map(messages -> messages.get(0).getContent())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.orElseGet(() -> Optional.ofNullable(context.getChatRequest().getChatMessages())
|
||||
.orElse(List.of()).stream()
|
||||
.filter(message -> message instanceof UserMessage)
|
||||
.map(message -> ((UserMessage) message).singleText())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.findFirst()
|
||||
.orElse(""));
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建包含历史消息的消息列表
|
||||
*/
|
||||
private List<ChatMessage> buildMessagesWithMemory(org.ruoyi.common.chat.domain.dto.request.ChatRequest chatRequest) {
|
||||
List<ChatMessage> messages = new ArrayList<>();
|
||||
|
||||
// 添加工作流对话消息
|
||||
List<ChatMessage> chatMessages = chatRequest.getChatMessages();
|
||||
if (!CollectionUtils.isEmpty(chatMessages)) {
|
||||
messages.addAll(chatMessages);
|
||||
}
|
||||
|
||||
// 添加历史记忆
|
||||
if (ENABLE_PERSISTENT_MEMORY && chatRequest.getSessionId() != null) {
|
||||
MessageWindowChatMemory memory = createChatMemory(chatRequest.getSessionId());
|
||||
if (memory != null) {
|
||||
List<ChatMessage> historicalMessages = memory.messages();
|
||||
if (historicalMessages != null && !historicalMessages.isEmpty()) {
|
||||
messages.addAll(historicalMessages);
|
||||
log.debug("已加载 {} 条历史消息用于会话 {}", historicalMessages.size(), chatRequest.getSessionId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建或获取聊天内存实例
|
||||
*/
|
||||
private MessageWindowChatMemory createChatMemory(Object memoryId) {
|
||||
return MEMORY_CACHE.computeIfAbsent(memoryId, key -> {
|
||||
try {
|
||||
PersistentChatMemoryStore store = new PersistentChatMemoryStore();
|
||||
return MessageWindowChatMemory.builder()
|
||||
.id(memoryId)
|
||||
.maxMessages(DEFAULT_MAX_MESSAGES)
|
||||
.chatMemoryStore(store)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.warn("创建聊天内存失败: {}", e.getMessage());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建响应处理器
|
||||
*/
|
||||
private StreamingChatResponseHandler createResponseHandler(
|
||||
org.ruoyi.common.chat.domain.dto.request.ChatRequest chatRequest,
|
||||
Long userId,
|
||||
String tokenValue,
|
||||
org.ruoyi.common.chat.domain.vo.chat.ChatModelVo chatModelVo,
|
||||
AbstractStreamingChatService chatService) {
|
||||
|
||||
return new StreamingChatResponseHandler() {
|
||||
private final StringBuilder messageBuffer = new StringBuilder();
|
||||
|
||||
@Override
|
||||
public void onPartialResponse(String partialResponse) {
|
||||
messageBuffer.append(partialResponse);
|
||||
SseMessageUtils.sendMessage(userId, partialResponse);
|
||||
log.debug("收到消息片段: {}", partialResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(dev.langchain4j.model.chat.response.ChatResponse completeResponse) {
|
||||
try {
|
||||
String fullMessage = messageBuffer.toString();
|
||||
if (!fullMessage.isEmpty()) {
|
||||
chatService.saveChatMessage(chatRequest, userId, fullMessage,
|
||||
RoleType.ASSISTANT.getName(), chatModelVo);
|
||||
}
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
log.info("消息结束,已保存到数据库");
|
||||
} catch (Exception e) {
|
||||
log.error("完成响应时出错: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
log.error("流式响应错误: {}", error.getMessage(), error);
|
||||
try {
|
||||
SseMessageUtils.sendMessage(userId, "模型调用失败: " + error.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
} catch (Exception e) {
|
||||
log.error("发送错误消息失败: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ReSumeRunner;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.service.workFlow.IWorkFlowStarterService;
|
||||
import org.ruoyi.common.core.utils.ObjectUtils;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
/**
|
||||
* 人机交互恢复处理器
|
||||
* <p>
|
||||
* 处理 isResume=true 的场景,恢复工作流的人机交互
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(1)
|
||||
@RequiredArgsConstructor
|
||||
public class ResumeChatHandler implements ChatHandler {
|
||||
|
||||
private final IWorkFlowStarterService workFlowStarterService;
|
||||
|
||||
@Override
|
||||
public boolean supports(ChatContext context) {
|
||||
Boolean isResume = context.getChatRequest().getIsResume();
|
||||
return isResume != null && isResume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SseEmitter handle(ChatContext context) {
|
||||
log.info("处理人机交互恢复,用户: {}", context.getUserId());
|
||||
|
||||
ReSumeRunner reSumeRunner = context.getChatRequest().getReSumeRunner();
|
||||
if (ObjectUtils.isEmpty(reSumeRunner)) {
|
||||
log.warn("人机交互恢复参数为空");
|
||||
return context.getEmitter();
|
||||
}
|
||||
|
||||
workFlowStarterService.resumeFlow(
|
||||
reSumeRunner.getRuntimeUuid(),
|
||||
reSumeRunner.getFeedbackContent(),
|
||||
context.getEmitter()
|
||||
);
|
||||
|
||||
return context.getEmitter();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package org.ruoyi.service.chat.handler;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.base.ThreadContext;
|
||||
import org.ruoyi.common.chat.domain.dto.request.WorkFlowRunner;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.service.workFlow.IWorkFlowStarterService;
|
||||
import org.ruoyi.common.core.utils.ObjectUtils;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
/**
|
||||
* 工作流对话处理器
|
||||
* <p>
|
||||
* 处理 enableWorkFlow=true 的场景,启动工作流对话
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(2)
|
||||
@RequiredArgsConstructor
|
||||
public class WorkflowChatHandler implements ChatHandler {
|
||||
|
||||
private final IWorkFlowStarterService workFlowStarterService;
|
||||
|
||||
@Override
|
||||
public boolean supports(ChatContext context) {
|
||||
Boolean enableWorkFlow = context.getChatRequest().getEnableWorkFlow();
|
||||
return enableWorkFlow != null && enableWorkFlow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SseEmitter handle(ChatContext context) {
|
||||
log.info("处理工作流对话,用户: {}, 会话: {}",
|
||||
context.getUserId(), context.getChatRequest().getSessionId());
|
||||
|
||||
WorkFlowRunner runner = context.getChatRequest().getWorkFlowRunner();
|
||||
if (ObjectUtils.isEmpty(runner)) {
|
||||
log.warn("工作流参数为空");
|
||||
return context.getEmitter();
|
||||
}
|
||||
|
||||
return workFlowStarterService.streaming(
|
||||
ThreadContext.getCurrentUser(),
|
||||
runner.getUuid(),
|
||||
runner.getInputs(),
|
||||
context.getChatRequest().getSessionId()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,52 +1,37 @@
|
||||
package org.ruoyi.service.chat.impl;
|
||||
|
||||
import dev.langchain4j.agentic.AgenticServices;
|
||||
import dev.langchain4j.community.model.dashscope.QwenChatModel;
|
||||
import dev.langchain4j.data.message.ChatMessage;
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.response.ChatResponse;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import dev.langchain4j.service.tool.ToolProvider;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.agent.McpAgent;
|
||||
import org.ruoyi.common.chat.base.ThreadContext;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ReSumeRunner;
|
||||
import org.ruoyi.common.chat.domain.dto.request.WorkFlowRunner;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.enums.RoleType;
|
||||
import org.ruoyi.common.chat.service.chat.IChatService;
|
||||
import org.ruoyi.common.chat.service.chatMessage.AbstractChatMessageService;
|
||||
import org.ruoyi.common.chat.service.workFlow.IWorkFlowStarterService;
|
||||
import org.ruoyi.common.core.utils.ObjectUtils;
|
||||
import org.ruoyi.common.core.utils.SpringUtils;
|
||||
import org.ruoyi.common.core.utils.StringUtils;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
import org.ruoyi.mcp.service.core.ToolProviderFactory;
|
||||
import org.ruoyi.service.chat.impl.memory.PersistentChatMemoryStore;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 流式聊天服务抽象基类 - 支持上下文和长期记忆
|
||||
* 使用模板方法模式,抽取公共逻辑
|
||||
* 流式聊天服务抽象基类
|
||||
* <p>
|
||||
* 提供核心的流式对话能力:
|
||||
* 1. 构建流式聊天模型
|
||||
* 2. 创建响应处理器
|
||||
* 3. 消息持久化
|
||||
* <p>
|
||||
* 设计原则:
|
||||
* 1. 抽象层只依赖业务模型,不依赖具体SDK
|
||||
* 2. 子类负责将业务模型转换为厂商SDK格式
|
||||
* 3. 提供钩子方法,子类可灵活覆盖
|
||||
* 4. 支持长期记忆 - 自动维护会话的消息历史
|
||||
* - 抽象层只依赖业务模型,不依赖具体SDK
|
||||
* - 子类负责将业务模型转换为厂商SDK格式
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
@@ -55,162 +40,77 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@Validated
|
||||
public abstract class AbstractStreamingChatService extends AbstractChatMessageService implements IChatService {
|
||||
|
||||
/**
|
||||
* 默认保留的消息窗口大小(用于长期记忆)
|
||||
*/
|
||||
private static final int DEFAULT_MAX_MESSAGES = 20;
|
||||
|
||||
/**
|
||||
* 是否启用长期记忆功能
|
||||
*/
|
||||
private static final boolean enablePersistentMemory = true;
|
||||
|
||||
/**
|
||||
* 内存实例缓存,避免同一会话重复创建
|
||||
* Key: sessionId, Value: MessageWindowChatMemory实例
|
||||
*/
|
||||
private static final Map<Object, MessageWindowChatMemory> memoryCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 获取工作流启用Bean对象
|
||||
*/
|
||||
private static final IWorkFlowStarterService starterService = SpringUtils.getBean(IWorkFlowStarterService.class);
|
||||
|
||||
/**
|
||||
* 定义聊天流程骨架
|
||||
* 注意:此方法已被 Handler 模式取代,保留是为了兼容旧调用
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public SseEmitter chat(ChatContext chatContext) {
|
||||
// 获取模型管理视图对象
|
||||
ChatModelVo chatModelVo = chatContext.getChatModelVo();
|
||||
// 获取对话请求对象
|
||||
ChatRequest chatRequest = chatContext.getChatRequest();
|
||||
// 获取SSe连接对象
|
||||
SseEmitter emitter = chatContext.getEmitter();
|
||||
// 获取用户ID
|
||||
Long userId = chatContext.getUserId();
|
||||
// 获取Token
|
||||
String tokenValue = chatContext.getTokenValue();
|
||||
// 获取响应处理器
|
||||
StreamingChatResponseHandler handler = chatContext.getHandler();
|
||||
SseEmitter emitter = chatContext.getEmitter();
|
||||
|
||||
try {
|
||||
String content = Optional.ofNullable(chatRequest.getMessages()).filter(messages -> !messages.isEmpty())
|
||||
// 对话逻辑:从 messages 筛选第一个元素
|
||||
.map(messages -> messages.get(0).getContent())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
// 工作流逻辑:从 chatMessages 筛选 UserMessage 的文本
|
||||
.orElseGet(() -> Optional.ofNullable(chatRequest.getChatMessages()).orElse(List.of()).stream()
|
||||
.filter(message -> message instanceof UserMessage um)
|
||||
.map(message -> ((UserMessage) message).singleText())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.findFirst()
|
||||
.orElse(""));
|
||||
// 提取用户消息内容
|
||||
String content = extractUserContent(chatRequest);
|
||||
|
||||
// 保存用户消息
|
||||
saveChatMessage(chatRequest, userId, content, RoleType.USER.getName(), chatModelVo);
|
||||
|
||||
// 判断用户是否重新输入
|
||||
boolean isResume = chatRequest.getIsResume() != null && chatRequest.getIsResume();
|
||||
if (isResume){
|
||||
ReSumeRunner reSumeRunner = chatRequest.getReSumeRunner();
|
||||
if (ObjectUtils.isNotEmpty(reSumeRunner)){
|
||||
starterService.resumeFlow(reSumeRunner.getRuntimeUuid(), reSumeRunner.getFeedbackContent(), emitter);
|
||||
return emitter;
|
||||
}
|
||||
}
|
||||
// 构建消息列表(由 Handler 负责构建,这里简单处理)
|
||||
List<ChatMessage> messages = convertToChatMessages(chatRequest);
|
||||
|
||||
// 判断用户是否开启工作流
|
||||
boolean enableWorkFlow = chatRequest.getEnableWorkFlow() != null && chatRequest.getEnableWorkFlow();
|
||||
if (enableWorkFlow) {
|
||||
WorkFlowRunner runner = chatRequest.getWorkFlowRunner();
|
||||
if (ObjectUtils.isNotEmpty(runner)){
|
||||
return starterService.streaming(ThreadContext.getCurrentUser(), runner.getUuid(), runner.getInputs(), chatRequest.getSessionId());
|
||||
}
|
||||
}
|
||||
// 创建响应处理器
|
||||
StreamingChatResponseHandler handler = createResponseHandler(
|
||||
chatRequest, userId, tokenValue, chatModelVo);
|
||||
|
||||
// 调用具体实现的聊天方法
|
||||
doChat(chatModelVo, chatRequest, messages, handler);
|
||||
|
||||
// 使用长期记忆增强的消息列表
|
||||
List<ChatMessage> messagesWithMemory = buildMessagesWithMemory(chatRequest);
|
||||
if (chatRequest.getEnableThinking()) {
|
||||
String msg = doAgent(content, chatModelVo);
|
||||
SseMessageUtils.sendMessage(userId, msg);
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
// 保存助手回复消息
|
||||
saveChatMessage(chatRequest, userId, msg, RoleType.ASSISTANT.getName(), chatModelVo);
|
||||
} else {
|
||||
// 创建包含内存管理的响应处理器
|
||||
handler = ObjectUtils.isEmpty(handler) ? createResponseHandler(chatRequest, userId, tokenValue, chatModelVo) : handler;
|
||||
// 调用具体实现的聊天方法
|
||||
doChat(chatModelVo, chatRequest, messagesWithMemory, handler);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SseMessageUtils.sendMessage(userId, "对话出错:" + e.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
log.error("{}请求失败:{}", getProviderName(), e.getMessage(), e);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建包含历史消息和当前请求的完整消息列表(长期记忆)
|
||||
* 返回: 历史消息 + 当前请求消息
|
||||
* 确保即使第一次对话也有消息上下文
|
||||
*
|
||||
* @param chatRequest 聊天请求
|
||||
* @return 包含历史消息和当前请求消息的完整消息列表
|
||||
* 提取用户消息内容
|
||||
*/
|
||||
protected List<ChatMessage> buildMessagesWithMemory(ChatRequest chatRequest) {
|
||||
List<ChatMessage> messages = new ArrayList<>();
|
||||
// 工作流对话消息
|
||||
List<ChatMessage> chatMessages = chatRequest.getChatMessages();
|
||||
if (!CollectionUtils.isEmpty(chatMessages)){
|
||||
messages.addAll(chatMessages);
|
||||
}
|
||||
// 开启长期记忆
|
||||
if (enablePersistentMemory && chatRequest.getSessionId() != null) {
|
||||
MessageWindowChatMemory memory = createChatMemory(chatRequest.getSessionId());
|
||||
if (memory != null) {
|
||||
List<ChatMessage> historicalMessages = memory.messages();
|
||||
if (historicalMessages != null && !historicalMessages.isEmpty()) {
|
||||
messages.addAll(historicalMessages);
|
||||
log.debug("已加载 {} 条历史消息用于会话 {}", historicalMessages.size(), chatRequest.getSessionId());
|
||||
}
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
return messages;
|
||||
private String extractUserContent(ChatRequest chatRequest) {
|
||||
return Optional.ofNullable(chatRequest.getMessages())
|
||||
.filter(messages -> !messages.isEmpty())
|
||||
.map(messages -> messages.get(0).getContent())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.orElseGet(() -> Optional.ofNullable(chatRequest.getChatMessages())
|
||||
.orElse(List.of()).stream()
|
||||
.filter(message -> message instanceof UserMessage)
|
||||
.map(message -> ((UserMessage) message).singleText())
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.findFirst()
|
||||
.orElse(""));
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建或获取聊天内存实例(缓存机制)
|
||||
* 同一个会话ID会返回同一个内存实例,避免重复创建和消息丢失
|
||||
*
|
||||
* @param memoryId 内存ID(会话ID)
|
||||
* @return MessageWindowChatMemory实例
|
||||
* 转换消息格式
|
||||
*/
|
||||
private MessageWindowChatMemory createChatMemory(Object memoryId) {
|
||||
// 先从缓存中获取
|
||||
return memoryCache.computeIfAbsent(memoryId, key -> {
|
||||
try {
|
||||
PersistentChatMemoryStore store = new PersistentChatMemoryStore();
|
||||
return MessageWindowChatMemory.builder()
|
||||
.id(memoryId)
|
||||
.maxMessages(DEFAULT_MAX_MESSAGES)
|
||||
.chatMemoryStore(store)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
log.warn("创建聊天内存失败: {}", e.getMessage());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
private List<ChatMessage> convertToChatMessages(ChatRequest chatRequest) {
|
||||
List<ChatMessage> chatMessages = chatRequest.getChatMessages();
|
||||
return chatMessages != null ? chatMessages : List.of();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行聊天(钩子方法 - 子类必须实现)
|
||||
* 注意:messages 已包含完整的历史上下文和当前消息
|
||||
*
|
||||
* @param chatModelVo 模型配置
|
||||
* @param chatRequest 聊天请求
|
||||
* @param handler 响应处理器
|
||||
* @param chatModelVo 模型配置
|
||||
* @param chatRequest 聊天请求
|
||||
* @param messagesWithMemory 消息列表
|
||||
* @param handler 响应处理器
|
||||
*/
|
||||
protected abstract void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest,
|
||||
List<ChatMessage> messagesWithMemory, StreamingChatResponseHandler handler);
|
||||
@@ -218,11 +118,11 @@ public abstract class AbstractStreamingChatService extends AbstractChatMessageSe
|
||||
/**
|
||||
* 创建标准的响应处理器
|
||||
*
|
||||
* @param chatRequest 聊天请求,包含sessionId等上下文信息
|
||||
* @param chatRequest 聊天请求
|
||||
* @param userId 用户ID
|
||||
* @param tokenValue 会话令牌
|
||||
* @param chatModelVo 模型配置
|
||||
* @return 标准的流式响应处理器
|
||||
* @return 流式响应处理器
|
||||
*/
|
||||
protected StreamingChatResponseHandler createResponseHandler(ChatRequest chatRequest, Long userId,
|
||||
String tokenValue, ChatModelVo chatModelVo) {
|
||||
@@ -232,10 +132,7 @@ public abstract class AbstractStreamingChatService extends AbstractChatMessageSe
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onPartialResponse(String partialResponse) {
|
||||
// 将消息片段追加到缓冲区
|
||||
messageBuffer.append(partialResponse);
|
||||
|
||||
// 实时发送消息片段到客户端
|
||||
SseMessageUtils.sendMessage(userId, partialResponse);
|
||||
log.debug("收到{}消息片段: {}", getProviderName(), partialResponse);
|
||||
}
|
||||
@@ -243,17 +140,11 @@ public abstract class AbstractStreamingChatService extends AbstractChatMessageSe
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse completeResponse) {
|
||||
try {
|
||||
// 消息流完成,保存消息到数据库和内存
|
||||
String fullMessage = messageBuffer.toString();
|
||||
|
||||
if (fullMessage.isEmpty()) {
|
||||
log.warn("{}接收到空消息", getProviderName());
|
||||
} else {
|
||||
// 保存助手回复消息
|
||||
saveChatMessage(chatRequest, userId, fullMessage, RoleType.ASSISTANT.getName(), chatModelVo);
|
||||
if (!fullMessage.isEmpty()) {
|
||||
saveChatMessage(chatRequest, userId, fullMessage,
|
||||
RoleType.ASSISTANT.getName(), chatModelVo);
|
||||
}
|
||||
|
||||
// 关闭SSE连接
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
log.info("{}消息结束,已保存到数据库", getProviderName());
|
||||
} catch (Exception e) {
|
||||
@@ -264,20 +155,12 @@ public abstract class AbstractStreamingChatService extends AbstractChatMessageSe
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
log.error("{}流式响应错误: {}", getProviderName(), error.getMessage(), error);
|
||||
|
||||
// 发送错误消息到前端
|
||||
try {
|
||||
String errorMessage = String.format("模型调用失败: %s", error.getMessage());
|
||||
SseMessageUtils.sendMessage(userId, errorMessage);
|
||||
} catch (Exception e) {
|
||||
log.error("发送错误消息失败: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 关闭SSE连接,避免前端一直等待
|
||||
try {
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
} catch (Exception e) {
|
||||
log.error("关闭SSE连接失败: {}", e.getMessage(), e);
|
||||
log.error("发送错误消息失败: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -288,75 +171,12 @@ public abstract class AbstractStreamingChatService extends AbstractChatMessageSe
|
||||
*/
|
||||
public abstract String getProviderName();
|
||||
|
||||
protected String doAgent(String userMessage, ChatModelVo chatModelVo) {
|
||||
log.info("执行Agent任务,消息: {}", userMessage);
|
||||
// 加载所有可用的 Agent,让 Supervisor 根据任务类型自动选择
|
||||
return doAgentWithAllAgents(userMessage, chatModelVo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用单一 Agent 处理所有任务
|
||||
* 不使用 Supervisor 模式,而是使用 MCP Agent 来处理所有任务
|
||||
* 创建流式聊天模型(子类必须实现)
|
||||
*
|
||||
* @param userMessage 用户消息
|
||||
* @param chatModelVo 聊天模型配置
|
||||
* @return Agent 响应结果
|
||||
* @param chatModelVo 模型配置
|
||||
* @param chatRequest 聊天请求
|
||||
* @return 流式聊天模型实例
|
||||
*/
|
||||
protected String doAgentWithAllAgents(String userMessage, ChatModelVo chatModelVo) {
|
||||
|
||||
try {
|
||||
// 1. 加载 LLM 模型
|
||||
QwenChatModel qwenChatModel = QwenChatModel.builder()
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
.modelName(chatModelVo.getModelName())
|
||||
.build();
|
||||
|
||||
// 2. 获取统一工具提供工厂
|
||||
ToolProviderFactory toolProviderFactory = SpringUtils.getBean(ToolProviderFactory.class);
|
||||
|
||||
// 3. 获取所有可用的工具
|
||||
|
||||
// 3.1 添加 BUILTIN 工具对象(包括 SQL 工具)
|
||||
List<Object> builtinTools = toolProviderFactory.getAllBuiltinToolObjects();
|
||||
|
||||
List<Object> allTools = new ArrayList<>(builtinTools);
|
||||
|
||||
log.debug("Loaded {} builtin tools (including SQL tools)", builtinTools.size());
|
||||
|
||||
log.debug("Total tools: {}", allTools.size());
|
||||
|
||||
// 4. 获取 MCP 工具提供者
|
||||
ToolProvider mcpToolProvider = toolProviderFactory.getAllEnabledMcpToolsProvider();
|
||||
|
||||
// 5. 创建 MCP Agent(包含所有工具)
|
||||
var agentBuilder = AgenticServices.agentBuilder(McpAgent.class).chatModel(qwenChatModel);
|
||||
|
||||
// 添加所有工具
|
||||
if (!allTools.isEmpty()) {
|
||||
agentBuilder.tools(allTools.toArray(new Object[0]));
|
||||
}
|
||||
|
||||
// 添加 MCP 工具
|
||||
if (mcpToolProvider != null) {
|
||||
agentBuilder.toolProvider(mcpToolProvider);
|
||||
}
|
||||
|
||||
McpAgent mcpAgent = agentBuilder.build();
|
||||
|
||||
// 6. 调用大模型LLM
|
||||
String result = mcpAgent.callMcpTool(userMessage);
|
||||
log.info("Agent 执行完成,结果长度: {}", result.length());
|
||||
return result;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Agent 模式执行失败: {}", e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建流式聊天模型
|
||||
* 子类必须实现此方法,返回对应厂商的模型实例
|
||||
*/
|
||||
protected abstract StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest);
|
||||
public abstract StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest);
|
||||
}
|
||||
|
||||
@@ -1,29 +1,23 @@
|
||||
package org.ruoyi.service.chat.impl;
|
||||
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.service.chat.IChatModelService;
|
||||
import org.ruoyi.common.chat.service.chat.IChatService;
|
||||
import org.ruoyi.common.chat.domain.dto.ChatMessageDTO;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.entity.chat.ChatContext;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.chat.factory.ChatServiceFactory;
|
||||
import org.ruoyi.common.satoken.utils.LoginHelper;
|
||||
import org.ruoyi.common.sse.core.SseEmitterManager;
|
||||
import org.ruoyi.domain.bo.vector.QueryVectorBo;
|
||||
import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo;
|
||||
import org.ruoyi.service.knowledge.IKnowledgeInfoService;
|
||||
import org.ruoyi.service.vector.VectorStoreService;
|
||||
import org.ruoyi.service.chat.handler.ChatContextBuilder;
|
||||
import org.ruoyi.service.chat.handler.ChatHandler;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 聊天服务业务实现
|
||||
* 聊天服务门面层
|
||||
* <p>
|
||||
* 作为统一入口,负责:
|
||||
* 1. 构建对话上下文
|
||||
* 2. 路由到对应的处理器
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/12/13
|
||||
@@ -33,111 +27,25 @@ import java.util.List;
|
||||
@RequiredArgsConstructor
|
||||
public class ChatServiceFacade {
|
||||
|
||||
private final IChatModelService chatModelService;
|
||||
|
||||
private final ChatServiceFactory chatServiceFactory;
|
||||
|
||||
private final IKnowledgeInfoService knowledgeInfoService;
|
||||
|
||||
private final VectorStoreService vectorStoreService;
|
||||
|
||||
private final SseEmitterManager sseEmitterManager;
|
||||
private final ChatContextBuilder contextBuilder;
|
||||
private final List<ChatHandler> handlers;
|
||||
|
||||
/**
|
||||
* 统一聊天入口 - SSE流式响应
|
||||
*
|
||||
* @param chatRequest 聊天请求
|
||||
* @param request HTTP 请求对象
|
||||
* @param request HTTP请求对象
|
||||
* @return SseEmitter
|
||||
*/
|
||||
public SseEmitter sseChat(ChatRequest chatRequest, HttpServletRequest request) {
|
||||
// 1. 根据模型名称查询完整配置
|
||||
ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel());
|
||||
if (chatModelVo == null) {
|
||||
throw new IllegalArgumentException("模型不存在: " + chatRequest.getModel());
|
||||
}
|
||||
// 1. 构建对话上下文
|
||||
ChatContext context = contextBuilder.build(chatRequest);
|
||||
|
||||
// 2. 构建上下文消息列表
|
||||
List<ChatMessageDTO> contextMessages = buildContextMessages(chatRequest);
|
||||
chatRequest.setMessages(contextMessages);
|
||||
|
||||
// 3. 路由服务提供商
|
||||
String category = chatModelVo.getProviderCode();
|
||||
log.info("路由到服务提供商: {}, 模型: {}", category, chatRequest.getModel());
|
||||
IChatService chatService = chatServiceFactory.getOriginalService(category);
|
||||
|
||||
// 4. 具体的服务实现
|
||||
Long userId = LoginHelper.getUserId();
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
SseEmitter emitter = sseEmitterManager.connect(userId, tokenValue);
|
||||
|
||||
// 5. 创建对话上下文对象
|
||||
ChatContext chatContext = ChatContext.builder()
|
||||
.chatModelVo(chatModelVo)
|
||||
.chatRequest(chatRequest)
|
||||
.emitter(emitter)
|
||||
.userId(userId)
|
||||
.tokenValue(tokenValue)
|
||||
.build();
|
||||
return chatService.chat(chatContext);
|
||||
// 2. 路由到对应的处理器
|
||||
return handlers.stream()
|
||||
.filter(handler -> handler.supports(context))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalStateException("无可用对话处理器"))
|
||||
.handle(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建上下文消息列表
|
||||
*
|
||||
* @param chatRequest 聊天请求
|
||||
* @return 上下文消息列表
|
||||
*/
|
||||
private List<ChatMessageDTO> buildContextMessages(ChatRequest chatRequest) {
|
||||
|
||||
List<ChatMessageDTO> messages = chatRequest.getMessages();
|
||||
|
||||
// 从向量库查询相关历史消息
|
||||
if (chatRequest.getKnowledgeId() != null) {
|
||||
// 查询知识库信息
|
||||
KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKnowledgeId()));
|
||||
if (knowledgeInfoVo == null) {
|
||||
log.warn("知识库信息不存在,kid: {}", chatRequest.getKnowledgeId());
|
||||
return messages;
|
||||
}
|
||||
|
||||
// 查询向量模型配置信息
|
||||
ChatModelVo chatModel = chatModelService.selectModelByName(knowledgeInfoVo.getEmbeddingModel());
|
||||
if (chatModel == null) {
|
||||
log.warn("向量模型配置不存在,模型名称: {}", knowledgeInfoVo.getEmbeddingModel());
|
||||
return messages;
|
||||
}
|
||||
|
||||
// 构建向量查询参数
|
||||
QueryVectorBo queryVectorBo = buildQueryVectorBo(chatRequest, knowledgeInfoVo, chatModel);
|
||||
|
||||
// 获取向量查询结果
|
||||
List<String> nearestList = vectorStoreService.getQueryVector(queryVectorBo);
|
||||
for (String prompt : nearestList) {
|
||||
// 知识库内容作为系统上下文添加
|
||||
messages.add(ChatMessageDTO.system(prompt));
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建向量查询参数
|
||||
*/
|
||||
private QueryVectorBo buildQueryVectorBo(ChatRequest chatRequest, KnowledgeInfoVo knowledgeInfoVo,
|
||||
ChatModelVo chatModel) {
|
||||
QueryVectorBo queryVectorBo = new QueryVectorBo();
|
||||
queryVectorBo.setQuery(chatRequest.getMessages().get(0).getContent());
|
||||
queryVectorBo.setKid(chatRequest.getKnowledgeId());
|
||||
queryVectorBo.setApiKey(chatModel.getApiKey());
|
||||
queryVectorBo.setBaseUrl(chatModel.getApiHost());
|
||||
queryVectorBo.setVectorModelName(knowledgeInfoVo.getVectorModel());
|
||||
queryVectorBo.setEmbeddingModelName(knowledgeInfoVo.getEmbeddingModel());
|
||||
queryVectorBo.setMaxResults(knowledgeInfoVo.getRetrieveLimit());
|
||||
return queryVectorBo;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ import java.util.List;
|
||||
public class OllamaServiceImpl extends AbstractStreamingChatService {
|
||||
|
||||
@Override
|
||||
protected StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
return OllamaStreamingChatModel.builder()
|
||||
.baseUrl(chatModelVo.getApiHost())
|
||||
.modelName(chatModelVo.getModelName())
|
||||
@@ -33,7 +33,7 @@ public class OllamaServiceImpl extends AbstractStreamingChatService {
|
||||
|
||||
|
||||
@Override
|
||||
protected void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory,StreamingChatResponseHandler handler) {
|
||||
public void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory,StreamingChatResponseHandler handler) {
|
||||
StreamingChatModel streamingChatModel = buildStreamingChatModel(chatModelVo, chatRequest);
|
||||
streamingChatModel.chat(messagesWithMemory, handler);
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ import java.util.List;
|
||||
public class OpenAIServiceImpl extends AbstractStreamingChatService {
|
||||
|
||||
@Override
|
||||
protected StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
return OpenAiStreamingChatModel.builder()
|
||||
.baseUrl(chatModelVo.getApiHost())
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
@@ -36,7 +36,7 @@ public class OpenAIServiceImpl extends AbstractStreamingChatService {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory, StreamingChatResponseHandler handler) {
|
||||
public void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory, StreamingChatResponseHandler handler) {
|
||||
StreamingChatModel streamingChatModel = buildStreamingChatModel(chatModelVo, chatRequest);
|
||||
streamingChatModel.chat(messagesWithMemory, handler);
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class QianWenChatServiceImpl extends AbstractStreamingChatService {
|
||||
private static final String UPLOAD_FILE_API_PREFIX = "fileid";
|
||||
|
||||
@Override
|
||||
protected StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo,ChatRequest chatRequest) {
|
||||
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo,ChatRequest chatRequest) {
|
||||
return QwenStreamingChatModel.builder()
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
.modelName(chatModelVo.getModelName())
|
||||
@@ -39,7 +39,7 @@ public class QianWenChatServiceImpl extends AbstractStreamingChatService {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doChat(ChatModelVo chatModelVo,ChatRequest chatRequest,List<ChatMessage> messagesWithMemory,
|
||||
public void doChat(ChatModelVo chatModelVo,ChatRequest chatRequest,List<ChatMessage> messagesWithMemory,
|
||||
StreamingChatResponseHandler handler) {
|
||||
StreamingChatModel streamingChatModel = buildStreamingChatModel(chatModelVo,chatRequest);
|
||||
// 判断是否存在需要使用阿里千问的文档解析功能
|
||||
|
||||
@@ -23,13 +23,13 @@ import java.util.List;
|
||||
@Slf4j
|
||||
public class ZhiPuChatServiceImpl extends AbstractStreamingChatService {
|
||||
@Override
|
||||
protected void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory, StreamingChatResponseHandler handler) {
|
||||
public void doChat(ChatModelVo chatModelVo, ChatRequest chatRequest, List<ChatMessage> messagesWithMemory, StreamingChatResponseHandler handler) {
|
||||
StreamingChatModel streamingChatModel = buildStreamingChatModel(chatModelVo,chatRequest);
|
||||
streamingChatModel.chat(messagesWithMemory, handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
return ZhipuAiStreamingChatModel.builder()
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
.model(chatModelVo.getModelName())
|
||||
|
||||
Reference in New Issue
Block a user