From 8d0c557bdbb3c96db02cc3f2417134249bb52160 Mon Sep 17 00:00:00 2001 From: likunlong Date: Mon, 18 Aug 2025 14:30:08 +0800 Subject: [PATCH 01/10] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E8=8E=B7=E5=8F=96=E9=AB=98=E4=BC=98=E5=85=88=E7=BA=A7?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E5=92=8C=E6=9C=8D=E5=8A=A1=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/ruoyi/domain/ChatModel.java | 5 + .../java/org/ruoyi/domain/bo/ChatModelBo.java | 5 + .../org/ruoyi/service/IChatModelService.java | 6 + .../service/impl/ChatModelServiceImpl.java | 13 ++ .../service/chat/impl/SseServiceImpl.java | 220 +++++++++++++----- 5 files changed, 186 insertions(+), 63 deletions(-) diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/ChatModel.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/ChatModel.java index c4e25a1b..dd443d97 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/ChatModel.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/ChatModel.java @@ -75,6 +75,11 @@ public class ChatModel extends BaseEntity { */ private String apiKey; + /** + * 优先级 + */ + private Integer priority; + /** * 备注 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/bo/ChatModelBo.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/bo/ChatModelBo.java index f333ed0c..b828515b 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/bo/ChatModelBo.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/bo/ChatModelBo.java @@ -74,6 +74,11 @@ public class ChatModelBo extends BaseEntity { @NotBlank(message = "请求地址不能为空", groups = { AddGroup.class, EditGroup.class }) private String apiHost; + /** + * 优先级 + */ + private Integer priority; + /** * 密钥 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java index 62dbf970..d93b527e 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java @@ -57,6 +57,12 @@ public interface IChatModelService { * 通过模型分类获取模型信息 */ ChatModelVo selectModelByCategory(String image); + + /** + * 通过模型分类获取优先级最高的模型信息 + */ + ChatModelVo selectModelByCategoryWithHighestPriority(String category); + /** * 获取ppt模型信息 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java index d0b5f5cd..b7acfaa9 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java @@ -136,6 +136,19 @@ public class ChatModelServiceImpl implements IChatModelService { public ChatModelVo selectModelByCategory(String category) { return baseMapper.selectVoOne(Wrappers.lambdaQuery().eq(ChatModel::getCategory, category)); } + + /** + * 通过模型分类获取优先级最高的模型信息 + */ + @Override + public ChatModelVo selectModelByCategoryWithHighestPriority(String category) { + return baseMapper.selectVoOne( + Wrappers.lambdaQuery() + .eq(ChatModel::getCategory, category) + .orderByDesc(ChatModel::getPriority) + .last("LIMIT 1") + ); + } @Override public ChatModel getPPT() { diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 269fcac0..e0d72866 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -113,8 +113,8 @@ public class SseServiceImpl implements ISseService { chatRequest.setSessionId(chatSessionBo.getId()); } } - // 根据模型分类调用不同的处理逻辑 - IChatService chatService = chatServiceFactory.getChatService(chatModelVo.getCategory()); + // 自动选择模型并获取对应的聊天服务 + IChatService chatService = autoSelectModelAndGetService(chatRequest); chatService.chat(chatRequest, sseEmitter); } catch (Exception e) { log.error(e.getMessage(),e); @@ -123,6 +123,45 @@ public class SseServiceImpl implements ISseService { return sseEmitter; } + /** + * 自动选择模型并获取对应的聊天服务 + */ + private IChatService autoSelectModelAndGetService(ChatRequest chatRequest) { + try { + // 处理特殊模型类型 + if ("gpt-image".equals(chatRequest.getModel())) { + chatModelVo = selectModelByCategory("image"); + return chatServiceFactory.getChatService(chatModelVo.getCategory()); + } + + // 根据模型名称获取模型分类,然后获取该分类下优先级最高的模型 + ChatModelVo tempModel = chatModelService.selectModelByName(chatRequest.getModel()); + if (tempModel == null) { + throw new IllegalStateException("未找到模型名称:" + chatRequest.getModel()); + } + + chatModelVo = selectModelByCategory(tempModel.getCategory()); + + // 直接返回对应的聊天服务 + return chatServiceFactory.getChatService(chatModelVo.getCategory()); + + } catch (Exception e) { + log.error("模型选择和服务获取失败: {}", e.getMessage(), e); + throw new IllegalStateException("模型选择和服务获取失败: " + e.getMessage()); + } + } + + /** + * 根据分类选择优先级最高的模型 + */ + private ChatModelVo selectModelByCategory(String category) { + ChatModelVo model = chatModelService.selectModelByCategoryWithHighestPriority(category); + if (model == null) { + throw new IllegalStateException("未找到" + category + "分类的模型配置"); + } + return model; + } + /** * 获取对话标题 * @@ -144,66 +183,20 @@ public class SseServiceImpl implements ISseService { * 构建消息列表 */ private void buildChatMessageList(ChatRequest chatRequest){ - String sysPrompt; - // 矫正模型名称 如果是gpt-image 则查询image类型模型 获取模型名称 - if(chatRequest.getModel().equals("gpt-image")) { - chatModelVo = chatModelService.selectModelByCategory("image"); - if (chatModelVo == null) { - log.error("未找到image类型的模型配置"); - throw new IllegalStateException("未找到image类型的模型配置"); - } - }else{ - chatModelVo = chatModelService.selectModelByName(chatRequest.getModel()); - } - // 获取对话消息列表 List messages = chatRequest.getMessages(); - // 查询向量库相关信息加入到上下文 - if(StringUtils.isNotEmpty(chatRequest.getKid())){ - List knMessages = new ArrayList<>(); - String content = messages.get(messages.size() - 1).getContent().toString(); - // 通过kid查询知识库信息 - KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKid())); - // 查询向量模型配置信息 - ChatModelVo chatModel = chatModelService.selectModelByName(knowledgeInfoVo.getEmbeddingModelName()); - - QueryVectorBo queryVectorBo = new QueryVectorBo(); - queryVectorBo.setQuery(content); - queryVectorBo.setKid(chatRequest.getKid()); - queryVectorBo.setApiKey(chatModel.getApiKey()); - queryVectorBo.setBaseUrl(chatModel.getApiHost()); - queryVectorBo.setVectorModelName(knowledgeInfoVo.getVectorModelName()); - queryVectorBo.setEmbeddingModelName(knowledgeInfoVo.getEmbeddingModelName()); - queryVectorBo.setMaxResults(knowledgeInfoVo.getRetrieveLimit()); - List nearestList = vectorStoreService.getQueryVector(queryVectorBo); - for (String prompt : nearestList) { - Message userMessage = Message.builder().content(prompt).role(Message.Role.USER).build(); - knMessages.add(userMessage); - } - messages.addAll(knMessages); - // 设置知识库系统提示词 - sysPrompt = knowledgeInfoVo.getSystemPrompt(); - if(StringUtils.isEmpty(sysPrompt)){ - sysPrompt ="###角色设定\n" + - "你是一个智能知识助手,专注于利用上下文中的信息来提供准确和相关的回答。\n" + - "###指令\n" + - "当用户的问题与上下文知识匹配时,利用上下文信息进行回答。如果问题与上下文不匹配,运用自身的推理能力生成合适的回答。\n" + - "###限制\n" + - "确保回答清晰简洁,避免提供不必要的细节。始终保持语气友好" + - "当前时间:"+ DateUtils.getDate(); - } - }else { - sysPrompt = chatModelVo.getSystemPrompt(); - if(StringUtils.isEmpty(sysPrompt)){ - sysPrompt ="你是一个由RuoYI-AI开发的人工智能助手,名字叫熊猫助手。你擅长中英文对话,能够理解并处理各种问题,提供安全、有帮助、准确的回答。" + - "当前时间:"+ DateUtils.getDate()+ - "#注意:回复之前注意结合上下文和工具返回内容进行回复。"; - } - } - // 设置系统默认提示词 - Message sysMessage = Message.builder().content(sysPrompt).role(Message.Role.SYSTEM).build(); - messages.add(0,sysMessage); - + + // 处理知识库相关逻辑 + String sysPrompt = processKnowledgeBase(chatRequest, messages); + + // 设置系统提示词 + Message sysMessage = Message.builder() + .content(sysPrompt) + .role(Message.Role.SYSTEM) + .build(); + messages.add(0, sysMessage); + chatRequest.setSysPrompt(sysPrompt); + // 用户对话内容 String chatString = null; // 获取用户对话信息 @@ -212,12 +205,113 @@ public class SseServiceImpl implements ISseService { if (CollectionUtil.isNotEmpty(listContent)) { chatString = listContent.get(0).toString(); } - } else if (content instanceof String) { - chatString = (String) content; + } else { + chatString = content.toString(); } - // 设置对话信息 chatRequest.setPrompt(chatString); } + + /** + * 处理知识库相关逻辑 + */ + private String processKnowledgeBase(ChatRequest chatRequest, List messages) { + if (StringUtils.isEmpty(chatRequest.getKid())) { + return getDefaultSystemPrompt(); + } + + try { + // 查询知识库信息 + KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKid())); + if (knowledgeInfoVo == null) { + log.warn("知识库信息不存在,kid: {}", chatRequest.getKid()); + return getDefaultSystemPrompt(); + } + + // 查询向量模型配置信息 + ChatModelVo chatModel = chatModelService.selectModelByName(knowledgeInfoVo.getEmbeddingModelName()); + if (chatModel == null) { + log.warn("向量模型配置不存在,模型名称: {}", knowledgeInfoVo.getEmbeddingModelName()); + return getDefaultSystemPrompt(); + } + + // 构建向量查询参数 + QueryVectorBo queryVectorBo = buildQueryVectorBo(chatRequest, knowledgeInfoVo, chatModel); + + // 获取向量查询结果 + List nearestList = vectorStoreService.getQueryVector(queryVectorBo); + + // 添加知识库消息到上下文 + addKnowledgeMessages(messages, nearestList); + + // 返回知识库系统提示词 + return getKnowledgeSystemPrompt(knowledgeInfoVo); + + } catch (Exception e) { + log.error("处理知识库信息失败: {}", e.getMessage(), e); + return getDefaultSystemPrompt(); + } + } + + /** + * 构建向量查询参数 + */ + private QueryVectorBo buildQueryVectorBo(ChatRequest chatRequest, KnowledgeInfoVo knowledgeInfoVo, ChatModelVo chatModel) { + String content = chatRequest.getMessages().get(chatRequest.getMessages().size() - 1).getContent().toString(); + + QueryVectorBo queryVectorBo = new QueryVectorBo(); + queryVectorBo.setQuery(content); + queryVectorBo.setKid(chatRequest.getKid()); + queryVectorBo.setApiKey(chatModel.getApiKey()); + queryVectorBo.setBaseUrl(chatModel.getApiHost()); + queryVectorBo.setVectorModelName(knowledgeInfoVo.getVectorModelName()); + queryVectorBo.setEmbeddingModelName(knowledgeInfoVo.getEmbeddingModelName()); + queryVectorBo.setMaxResults(knowledgeInfoVo.getRetrieveLimit()); + + return queryVectorBo; + } + + /** + * 添加知识库消息到上下文 + */ + private void addKnowledgeMessages(List messages, List nearestList) { + for (String prompt : nearestList) { + Message userMessage = Message.builder() + .content(prompt) + .role(Message.Role.USER) + .build(); + messages.add(userMessage); + } + } + + /** + * 获取知识库系统提示词 + */ + private String getKnowledgeSystemPrompt(KnowledgeInfoVo knowledgeInfoVo) { + String sysPrompt = knowledgeInfoVo.getSystemPrompt(); + if (StringUtils.isEmpty(sysPrompt)) { + sysPrompt = "###角色设定\n" + + "你是一个智能知识助手,专注于利用上下文中的信息来提供准确和相关的回答。\n" + + "###指令\n" + + "当用户的问题与上下文知识匹配时,利用上下文信息进行回答。如果问题与上下文不匹配,运用自身的推理能力生成合适的回答。\n" + + "###限制\n" + + "确保回答清晰简洁,避免提供不必要的细节。始终保持语气友好\n" + + "当前时间:" + DateUtils.getDate(); + } + return sysPrompt; + } + + /** + * 获取默认系统提示词 + */ + private String getDefaultSystemPrompt() { + String sysPrompt = chatModelVo != null ? chatModelVo.getSystemPrompt() : null; + if (StringUtils.isEmpty(sysPrompt)) { + sysPrompt = "你是一个由RuoYI-AI开发的人工智能助手,名字叫熊猫助手。你擅长中英文对话,能够理解并处理各种问题,提供安全、有帮助、准确的回答。" + + "当前时间:" + DateUtils.getDate() + + "#注意:回复之前注意结合上下文和工具返回内容进行回复。"; + } + return sysPrompt; + } /** From 8751bb5104d47e8035d7c1be72800f6b21921b64 Mon Sep 17 00:00:00 2001 From: likunlong Date: Mon, 18 Aug 2025 14:49:56 +0800 Subject: [PATCH 02/10] =?UTF-8?q?feat:=20=E6=95=B0=E6=8D=AE=E5=BA=93chat?= =?UTF-8?q?=5Fmodel=E6=B7=BB=E5=8A=A0=E4=BC=98=E5=85=88=E7=BA=A7=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/sql/update/chat-model-priority.sql | 26 +++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 script/sql/update/chat-model-priority.sql diff --git a/script/sql/update/chat-model-priority.sql b/script/sql/update/chat-model-priority.sql new file mode 100644 index 00000000..b7249fa0 --- /dev/null +++ b/script/sql/update/chat-model-priority.sql @@ -0,0 +1,26 @@ +alter table chat_model + add priority int default 1 null comment '模型优先级(值越大优先级越高)'; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 3 +WHERE t.id = 1782792839548735492; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 6 +WHERE t.id = 1859570229117022212; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 5 +WHERE t.id = 1859570229117022211; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 4 +WHERE t.id = 1782792839548735493; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 2 +WHERE t.id = 1828324413241466881; + +UPDATE `ruoyi-ai`.chat_model t +SET t.priority = 2 +WHERE t.id = 1782792839548735491; \ No newline at end of file From 07cb351807a09c499d659594bae0ea8d07a2958f Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 10:32:17 +0800 Subject: [PATCH 03/10] =?UTF-8?q?feat:=20=E6=A0=B9=E6=8D=AE=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E6=9C=89=E9=99=84=E4=BB=B6=E5=92=8C=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=EF=BC=8C=E8=87=AA=E5=8A=A8=E9=80=89=E6=8B=A9?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E5=B9=B6=E4=B8=94=E8=8E=B7=E5=8F=96=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ruoyi/common/chat/request/ChatRequest.java | 10 ++++++++++ .../chat/service/chat/impl/SseServiceImpl.java | 15 ++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java b/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java index 20a021de..276f5dc8 100644 --- a/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java +++ b/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java @@ -67,4 +67,14 @@ public class ChatRequest { */ private Long uuid; + /** + * 是否有附件 + */ + private Boolean hasAttachment; + + /** + * 是否自动切换模型 + */ + private Boolean autoSelectModel; + } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index e0d72866..6dfa302a 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -128,23 +128,20 @@ public class SseServiceImpl implements ISseService { */ private IChatService autoSelectModelAndGetService(ChatRequest chatRequest) { try { - // 处理特殊模型类型 - if ("gpt-image".equals(chatRequest.getModel())) { + if (Boolean.TRUE.equals(chatRequest.getHasAttachment())) { chatModelVo = selectModelByCategory("image"); - return chatServiceFactory.getChatService(chatModelVo.getCategory()); + } else if (Boolean.TRUE.equals(chatRequest.getAutoSelectModel())) { + chatModelVo = selectModelByCategory("chat"); + } else { + chatModelVo = chatModelService.selectModelByName(chatRequest.getModel()); } - // 根据模型名称获取模型分类,然后获取该分类下优先级最高的模型 - ChatModelVo tempModel = chatModelService.selectModelByName(chatRequest.getModel()); - if (tempModel == null) { + if (chatModelVo == null) { throw new IllegalStateException("未找到模型名称:" + chatRequest.getModel()); } - chatModelVo = selectModelByCategory(tempModel.getCategory()); - // 直接返回对应的聊天服务 return chatServiceFactory.getChatService(chatModelVo.getCategory()); - } catch (Exception e) { log.error("模型选择和服务获取失败: {}", e.getMessage(), e); throw new IllegalStateException("模型选择和服务获取失败: " + e.getMessage()); From 119483df86f82c82bd3150aa87b737365be87b3e Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 15:12:24 +0800 Subject: [PATCH 04/10] =?UTF-8?q?feat:=20=E8=87=AA=E5=8A=A8=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E8=AF=B7=E6=B1=82=E5=8F=82=E6=95=B0=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E5=90=8D=E7=A7=B0=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 6dfa302a..49035da7 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -139,7 +139,8 @@ public class SseServiceImpl implements ISseService { if (chatModelVo == null) { throw new IllegalStateException("未找到模型名称:" + chatRequest.getModel()); } - + // 自动设置请求参数中的模型名称 + chatRequest.setModel(chatModelVo.getModelName()); // 直接返回对应的聊天服务 return chatServiceFactory.getChatService(chatModelVo.getCategory()); } catch (Exception e) { From 4434d8346ca80262971bb71bc4baa7944ca42187 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 16:46:25 +0800 Subject: [PATCH 05/10] =?UTF-8?q?feat:=20=E9=97=AE=E7=AD=94=E6=97=B6?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BB=9F=E4=B8=80=E9=87=8D=E8=AF=95=E5=92=8C?= =?UTF-8?q?=E9=99=8D=E7=BA=A7=E9=80=BB=E8=BE=91=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/ruoyi/domain/vo/ChatModelVo.java | 6 + .../org/ruoyi/service/IChatModelService.java | 5 + .../service/impl/ChatModelServiceImpl.java | 14 +++ .../chat/listener/SSEEventSourceListener.java | 17 ++- .../service/chat/impl/DeepSeekChatImpl.java | 5 + .../service/chat/impl/OpenAIServiceImpl.java | 9 +- .../service/chat/impl/SseServiceImpl.java | 32 ++++- .../ruoyi/chat/support/ChatRetryHelper.java | 115 ++++++++++++++++++ .../org/ruoyi/chat/support/RetryNotifier.java | 39 ++++++ .../java/org/ruoyi/chat/util/SSEUtil.java | 2 +- 10 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java create mode 100644 ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java index 77afe2f4..256d8d4d 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java @@ -90,6 +90,12 @@ public class ChatModelVo implements Serializable { @ExcelProperty(value = "密钥") private String apiKey; + /** + * 优先级(值越大优先级越高) + */ + @ExcelProperty(value = "优先级") + private Integer priority; + /** * 备注 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java index d93b527e..9e90e46b 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java @@ -63,6 +63,11 @@ public interface IChatModelService { */ ChatModelVo selectModelByCategoryWithHighestPriority(String category); + /** + * 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。 + */ + ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority); + /** * 获取ppt模型信息 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java index b7acfaa9..069d75a6 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java @@ -150,6 +150,20 @@ public class ChatModelServiceImpl implements IChatModelService { ); } + /** + * 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。 + */ + @Override + public ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority) { + return baseMapper.selectVoOne( + Wrappers.lambdaQuery() + .eq(ChatModel::getCategory, category) + .lt(ChatModel::getPriority, currentPriority) + .orderByDesc(ChatModel::getPriority) + .last("LIMIT 1") + ); + } + @Override public ChatModel getPPT() { return baseMapper.selectOne(Wrappers.lambdaQuery().eq(ChatModel::getModelName, "ppt")); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index c91e28de..20adc22b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -21,6 +21,8 @@ import org.ruoyi.common.core.utils.SpringUtils; import org.ruoyi.common.core.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.ruoyi.chat.util.SSEUtil; +import org.ruoyi.chat.support.RetryNotifier; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; @@ -77,6 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener { if ("[DONE]".equals(data)) { //成功响应 emitter.complete(); + // 清理失败回调 + RetryNotifier.clear(sessionId); // 扣除费用 ChatRequest chatRequest = new ChatRequest(); // 设置对话角色 @@ -113,20 +117,31 @@ public class SSEEventSourceListener extends EventSourceListener { @Override public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); + // 清理失败回调 + RetryNotifier.clear(sessionId); } @SneakyThrows @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { + // 透传错误到前端 + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + // 通知重试 + RetryNotifier.notifyFailure(sessionId); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { - log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t); + String msg = body.string(); + log.error("OpenAI sse连接异常data:{},异常:{}", msg, t); + SSEUtil.sendErrorEvent(emitter, msg); } else { log.error("OpenAI sse连接异常data:{},异常:{}", response, t); + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } + // 通知重试 + RetryNotifier.notifyFailure(sessionId); eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index 9e59fdd7..f0ddd77b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.RetryNotifier; /** * deepseek */ @@ -57,11 +58,15 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); + // 通知上层失败,进入重试/降级 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); + // 同步异常直接通知失败 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index 50693617..b46fa03d 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.List; +import org.ruoyi.chat.support.RetryNotifier; /** @@ -65,7 +66,13 @@ public class OpenAIServiceImpl implements IChatService { .model(chatRequest.getModel()) .stream(true) .build(); - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + // 同步异常也触发失败回调,按会话维度 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 49035da7..40bd9929 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -9,6 +9,8 @@ import org.ruoyi.chat.factory.ChatServiceFactory; import org.ruoyi.chat.service.chat.IChatCostService; import org.ruoyi.chat.service.chat.IChatService; import org.ruoyi.chat.service.chat.ISseService; +import org.ruoyi.chat.support.ChatRetryHelper; +import org.ruoyi.chat.support.RetryNotifier; import org.ruoyi.chat.util.SSEUtil; import org.ruoyi.common.chat.entity.Tts.TextToSpeech; import org.ruoyi.common.chat.entity.chat.Message; @@ -115,7 +117,27 @@ public class SseServiceImpl implements ISseService { } // 自动选择模型并获取对应的聊天服务 IChatService chatService = autoSelectModelAndGetService(chatRequest); - chatService.chat(chatRequest, sseEmitter); + + // 统一重试与降级:封装启动逻辑,并通过ThreadLocal传递失败回调 + ChatModelVo currentModel = this.chatModelVo; + String currentCategory = currentModel.getCategory(); + ChatRetryHelper.executeWithRetry( + currentModel, + currentCategory, + chatModelService, + sseEmitter, + (modelForTry, onFailure) -> { + // 替换请求中的模型名称 + chatRequest.setModel(modelForTry.getModelName()); + // 将回调注册到ThreadLocal,供底层SSE失败时触发 + RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure); + try { + autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); + } finally { + // 不在此处清理,待下游结束/失败时清理 + } + } + ); } catch (Exception e) { log.error(e.getMessage(),e); SSEUtil.sendErrorEvent(sseEmitter,e.getMessage()); @@ -148,6 +170,14 @@ public class SseServiceImpl implements ISseService { throw new IllegalStateException("模型选择和服务获取失败: " + e.getMessage()); } } + + /** + * 根据给定分类获取服务并发起调用(避免在降级时重复选择模型) + */ + private void autoSelectServiceByCategoryAndInvoke(ChatRequest chatRequest, SseEmitter sseEmitter, String category) { + IChatService service = chatServiceFactory.getChatService(category); + service.chat(chatRequest, sseEmitter); + } /** * 根据分类选择优先级最高的模型 diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java new file mode 100644 index 00000000..bf4a4a74 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java @@ -0,0 +1,115 @@ +package org.ruoyi.chat.support; + +import lombok.extern.slf4j.Slf4j; +import org.ruoyi.chat.util.SSEUtil; +import org.ruoyi.domain.vo.ChatModelVo; +import org.ruoyi.service.IChatModelService; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +/** + * 统一的聊天重试与降级调度器。 + * + * 策略: + * - 当前模型最多重试 3 次;仍失败则降级到同分类内、优先级小于当前的最高优先级模型。 + * - 降级模型同样最多重试 3 次;仍失败则向前端返回失败信息并停止。 + * + * 注意:实现依赖调用方在底层异步失败时执行 onFailure.run() 通知本调度器。 + */ +@Slf4j +public class ChatRetryHelper { + + public interface AttemptStarter { + void start(ChatModelVo model, Runnable onFailure) throws Exception; + } + + public static void executeWithRetry( + ChatModelVo primaryModel, + String category, + IChatModelService chatModelService, + SseEmitter emitter, + AttemptStarter attemptStarter + ) { + Objects.requireNonNull(primaryModel, "primaryModel must not be null"); + Objects.requireNonNull(category, "category must not be null"); + Objects.requireNonNull(chatModelService, "chatModelService must not be null"); + Objects.requireNonNull(emitter, "emitter must not be null"); + Objects.requireNonNull(attemptStarter, "attemptStarter must not be null"); + + AtomicInteger mainAttempts = new AtomicInteger(0); + AtomicInteger fallbackAttempts = new AtomicInteger(0); + AtomicBoolean inFallback = new AtomicBoolean(false); + AtomicBoolean scheduling = new AtomicBoolean(false); + + class Scheduler { + volatile ChatModelVo current = primaryModel; + volatile ChatModelVo fallback = null; + + void startAttempt() { + try { + if (!inFallback.get()) { + if (mainAttempts.incrementAndGet() > 3) { + // 进入降级 + inFallback.set(true); + if (fallback == null) { + Integer curPriority = primaryModel.getPriority(); + if (curPriority == null) { + curPriority = Integer.MAX_VALUE; + } + fallback = chatModelService.selectFallbackModelByCategoryAndLessPriority(category, curPriority); + } + if (fallback == null) { + SSEUtil.sendErrorEvent(emitter, "当前模型重试3次均失败,且无可用降级模型"); + emitter.complete(); + return; + } + current = fallback; + mainAttempts.set(3); // 锁定 + fallbackAttempts.set(0); + } + } else { + if (fallbackAttempts.incrementAndGet() > 3) { + SSEUtil.sendErrorEvent(emitter, "降级模型重试3次仍失败"); + emitter.complete(); + return; + } + } + + Runnable onFailure = () -> { + // 去抖:避免同一次失败触发多次重试 + if (scheduling.compareAndSet(false, true)) { + try { + SSEUtil.sendErrorEvent(emitter, (inFallback.get() ? "降级模型" : "当前模型") + "调用失败,准备重试..."); + // 立即发起下一次尝试 + startAttempt(); + } finally { + scheduling.set(false); + } + } + }; + + attemptStarter.start(current, onFailure); + } catch (Exception ex) { + log.error("启动聊天尝试失败: {}", ex.getMessage(), ex); + SSEUtil.sendErrorEvent(emitter, "启动聊天尝试失败: " + ex.getMessage()); + // 直接按失败处理,继续重试/降级 + if (scheduling.compareAndSet(false, true)) { + try { + startAttempt(); + } finally { + scheduling.set(false); + } + } + } + } + } + + new Scheduler().startAttempt(); + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java new file mode 100644 index 00000000..77044081 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -0,0 +1,39 @@ +package org.ruoyi.chat.support; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。 + */ +public class RetryNotifier { + + private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); + + public static void setFailureCallback(Long sessionId, Runnable callback) { + if (sessionId == null || callback == null) { + return; + } + FAILURE_CALLBACKS.put(sessionId, callback); + } + + public static void clear(Long sessionId) { + if (sessionId == null) { + return; + } + FAILURE_CALLBACKS.remove(sessionId); + } + + public static void notifyFailure(Long sessionId) { + if (sessionId == null) { + return; + } + Runnable cb = FAILURE_CALLBACKS.get(sessionId); + if (Objects.nonNull(cb)) { + cb.run(); + } + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java index 9bfb6bf0..293e486e 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java @@ -25,6 +25,6 @@ public class SSEUtil { } catch (IOException e) { log.error("SSE发送失败: {}", e.getMessage()); } - sseEmitter.complete(); + // 不立即关闭,由上层策略决定是否继续重试或降级 } } From 1638b9dd75bfc15a6d860c67c7a75748d75aac8d Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 16:51:51 +0800 Subject: [PATCH 06/10] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E7=9B=AE?= =?UTF-8?q?=E5=89=8D=E5=AE=9E=E7=8E=B0=E7=B1=BB=E4=BD=BF=E7=94=A8=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E9=87=8D=E8=AF=95=E9=99=8D=E7=BA=A7=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FastGPTSSEEventSourceListener.java | 26 +++++++++++++++- .../service/chat/impl/CozeServiceImpl.java | 31 ++++++++++++------- .../service/chat/impl/DifyServiceImpl.java | 5 +++ .../service/chat/impl/FastGPTServiceImpl.java | 9 ++++-- .../service/chat/impl/OllamaServiceImpl.java | 4 +++ .../chat/impl/QianWenAiChatServiceImpl.java | 3 ++ .../chat/impl/ZhipuAiChatServiceImpl.java | 5 ++- 7 files changed, 67 insertions(+), 16 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index 0d58176d..dd8519f2 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -14,6 +14,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; +import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.util.SSEUtil; @Slf4j @Component @@ -21,12 +23,18 @@ import java.util.Objects; public class FastGPTSSEEventSourceListener extends EventSourceListener { private SseEmitter emitter; + private Long sessionId; @Autowired(required = false) public FastGPTSSEEventSourceListener(SseEmitter emitter) { this.emitter = emitter; } + public FastGPTSSEEventSourceListener(SseEmitter emitter, Long sessionId) { + this.emitter = emitter; + this.sessionId = sessionId; + } + @Override public void onOpen(EventSource eventSource, Response response) { log.info("FastGPT sse连接成功"); @@ -40,6 +48,9 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); + if (sessionId != null) { + RetryNotifier.clear(sessionId); + } } else { emitter.send(data); } @@ -57,13 +68,26 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(sessionId); + } return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { - log.error("FastGPT sse连接异常data:{},异常:{}", body.string(), t); + String msg = body.string(); + log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(sessionId); + } } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(sessionId); + } } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 7cbb5927..54269b08 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -20,6 +20,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.ruoyi.chat.support.RetryNotifier; /** * 扣子聊天管理 @@ -53,19 +54,25 @@ public class CozeServiceImpl implements IChatService { Flowable resp = coze.chat().stream(req); ExecutorService executor = Executors.newFixedThreadPool(10); executor.submit(() -> { - resp.blockingForEach( - event -> { - if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { - emitter.send(event.getMessage().getContent()); - log.info("coze: {}", event.getMessage().getContent()); + try { + resp.blockingForEach( + event -> { + if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { + emitter.send(event.getMessage().getContent()); + log.info("coze: {}", event.getMessage().getContent()); + } + if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { + emitter.complete(); + log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); + RetryNotifier.clear(chatRequest.getSessionId()); + } } - if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { - emitter.complete(); - log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - } - } - ); - coze.shutdownExecutor(); + ); + } catch (Exception ex) { + RetryNotifier.notifyFailure(chatRequest.getSessionId()); + } finally { + coze.shutdownExecutor(); + } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java index ac3ebab7..748d89c0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; +import org.ruoyi.chat.support.RetryNotifier; /** * dify 聊天管理 @@ -112,20 +113,24 @@ public class DifyServiceImpl implements IChatService { chatRequestResponse.setSessionId(chatRequest.getSessionId()); chatRequestResponse.setPrompt(respMessage.toString()); chatCostService.deductToken(chatRequestResponse); + RetryNotifier.clear(chatRequest.getSessionId()); } @Override public void onError(ErrorEvent event) { System.err.println("错误: " + event.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } @Override public void onException(Throwable throwable) { System.err.println("异常: " + throwable.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("dify请求失败:{}", e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java index 15acf6f2..b3f41431 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java @@ -33,7 +33,7 @@ public class FastGPTServiceImpl implements IChatService { ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel()); OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey()); List messages = chatRequest.getMessages(); - FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter); + FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter, chatRequest.getSessionId()); FastGPTChatCompletion completion = FastGPTChatCompletion .builder() .messages(messages) @@ -41,7 +41,12 @@ public class FastGPTServiceImpl implements IChatService { .detail(true) .stream(true) .build(); - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 532b052e..669f1c2f 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.ruoyi.chat.support.RetryNotifier; /** @@ -66,12 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }; api.chat(requestModel, streamHandler); emitter.complete(); + RetryNotifier.clear(chatRequest.getSessionId()); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 850ebf6c..8462529c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,15 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } @Override public void onError(Throwable error) { error.printStackTrace(); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index da44d6c0..1fd12406 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,14 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // System.out.println(error.getMessage()); + // 透传错误并触发重试 emitter.send(error.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } }; @@ -71,6 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; From c3ab13ae6799712c31f1c1e54ad27b112abd24d6 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 17:39:20 +0800 Subject: [PATCH 07/10] =?UTF-8?q?feat:=20=E5=A4=84=E7=90=86=E5=9C=A8?= =?UTF-8?q?=E9=9D=9EWeb=E7=BA=BF=E7=A8=8B=E4=B8=AD=E8=8E=B7=E5=8F=96Reques?= =?UTF-8?q?t=E4=B8=ADtoken=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/ruoyi/common/chat/request/ChatRequest.java | 5 +++++ .../org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java | 4 ++-- .../ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java | 2 +- .../org/ruoyi/chat/service/chat/impl/SseServiceImpl.java | 7 +++++++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java b/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java index 276f5dc8..71ebc4e5 100644 --- a/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java +++ b/ruoyi-common/ruoyi-common-chat/src/main/java/org/ruoyi/common/chat/request/ChatRequest.java @@ -77,4 +77,9 @@ public class ChatRequest { */ private Boolean autoSelectModel; + /** + * 会话令牌(为避免在非Web线程中获取Request,入口处注入) + */ + private String token; + } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java index 709c58ef..1c4af69d 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java @@ -128,8 +128,8 @@ public class ImageServiceImpl implements IChatService { OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey()); List messages = chatRequest.getMessages(); - // 获取会话token - String token = StpUtil.getTokenValue(); + // 获取会话token(从入口透传,避免非Web线程取值报错) + String token = chatRequest.getToken(); // 创建 SSE 事件源监听器 SSEEventSourceListener listener = new SSEEventSourceListener(emitter, chatRequest.getUserId(), chatRequest.getSessionId(), token); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index b46fa03d..c81cc0ea 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -58,7 +58,7 @@ public class OpenAIServiceImpl implements IChatService { Message userMessage = Message.builder().content("工具返回信息:"+toolString).role(Message.Role.USER).build(); messages.add(userMessage); } - String token = StpUtil.getTokenValue(); + String token = chatRequest.getToken(); SSEEventSourceListener listener = new SSEEventSourceListener(emitter,chatRequest.getUserId(),chatRequest.getSessionId(), token); ChatCompletion completion = ChatCompletion .builder() diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 40bd9929..dfab7316 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -47,6 +47,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import cn.dev33.satoken.stp.StpUtil; /** * @author ageer @@ -77,6 +78,12 @@ public class SseServiceImpl implements ISseService { public SseEmitter sseChat(ChatRequest chatRequest, HttpServletRequest request) { SseEmitter sseEmitter = new SseEmitter(0L); try { + // 记录当前会话令牌,供异步线程使用 + try { + chatRequest.setToken(StpUtil.getTokenValue()); + } catch (Exception ignore) { + // 保底:无token场景下忽略 + } // 构建消息列表 buildChatMessageList(chatRequest); // 设置对话角色 From 498135b7fda0a7f1f5e3d51bb3907128dab9382b Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 17:53:27 +0800 Subject: [PATCH 08/10] =?UTF-8?q?feat:=20=E5=A4=B1=E8=B4=A5=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E5=99=A8=E4=B8=AD=E4=BD=BF=E7=94=A8emitter=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1=E7=9A=84=E5=94=AF=E4=B8=80hash=E4=BD=9C=E4=B8=BAkey?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E5=86=8D=E4=BD=BF=E7=94=A8session=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E4=B8=8E=E4=B8=9A=E5=8A=A1=E8=BF=9B=E8=A1=8C=E7=BB=91?= =?UTF-8?q?=E5=AE=9A=EF=BC=8C=E5=90=8C=E6=97=B6=E4=B9=9F=E4=BF=9D=E8=AF=81?= =?UTF-8?q?=E8=B7=A8=E7=BA=BF=E7=A8=8B=E8=B0=83=E7=94=A8=E7=9A=84=E6=AD=A3?= =?UTF-8?q?=E7=A1=AE=E6=80=A7=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FastGPTSSEEventSourceListener.java | 22 +++++-------- .../chat/listener/SSEEventSourceListener.java | 12 +++---- .../service/chat/impl/CozeServiceImpl.java | 4 +-- .../service/chat/impl/DeepSeekChatImpl.java | 6 ++-- .../service/chat/impl/DifyServiceImpl.java | 8 ++--- .../service/chat/impl/OllamaServiceImpl.java | 6 ++-- .../service/chat/impl/OpenAIServiceImpl.java | 4 +-- .../chat/impl/QianWenAiChatServiceImpl.java | 6 ++-- .../service/chat/impl/SseServiceImpl.java | 4 +-- .../chat/impl/ZhipuAiChatServiceImpl.java | 8 ++--- .../org/ruoyi/chat/support/RetryNotifier.java | 31 +++++++++++-------- 11 files changed, 54 insertions(+), 57 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index dd8519f2..3895c1a3 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -48,9 +48,7 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); - if (sessionId != null) { - RetryNotifier.clear(sessionId); - } + RetryNotifier.clear(emitter); } else { emitter.send(data); } @@ -68,26 +66,20 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { String msg = body.string(); log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, msg); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(emitter); } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(emitter); } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index 20adc22b..ddcfcaaa 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -79,8 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener { if ("[DONE]".equals(data)) { //成功响应 emitter.complete(); - // 清理失败回调 - RetryNotifier.clear(sessionId); + // 清理失败回调(以 emitter 为键) + RetryNotifier.clear(emitter); // 扣除费用 ChatRequest chatRequest = new ChatRequest(); // 设置对话角色 @@ -118,7 +118,7 @@ public class SSEEventSourceListener extends EventSourceListener { public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); // 清理失败回调 - RetryNotifier.clear(sessionId); + RetryNotifier.clear(emitter); } @SneakyThrows @@ -127,8 +127,8 @@ public class SSEEventSourceListener extends EventSourceListener { if (Objects.isNull(response)) { // 透传错误到前端 SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - // 通知重试 - RetryNotifier.notifyFailure(sessionId); + // 通知重试(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); @@ -141,7 +141,7 @@ public class SSEEventSourceListener extends EventSourceListener { SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } // 通知重试 - RetryNotifier.notifyFailure(sessionId); + RetryNotifier.notifyFailure(emitter); eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 54269b08..e9863b6c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -64,12 +64,12 @@ public class CozeServiceImpl implements IChatService { if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { emitter.complete(); log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } } ); } catch (Exception ex) { - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } finally { coze.shutdownExecutor(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index f0ddd77b..0a6e6693 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -58,15 +58,15 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); - // 通知上层失败,进入重试/降级 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 通知上层失败,进入重试/降级(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); // 同步异常直接通知失败 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java index 748d89c0..9327e5a1 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java @@ -113,24 +113,24 @@ public class DifyServiceImpl implements IChatService { chatRequestResponse.setSessionId(chatRequest.getSessionId()); chatRequestResponse.setPrompt(respMessage.toString()); chatCostService.deductToken(chatRequestResponse); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } @Override public void onError(ErrorEvent event) { System.err.println("错误: " + event.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } @Override public void onException(Throwable throwable) { System.err.println("异常: " + throwable.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("dify请求失败:{}", e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 669f1c2f..2401b83e 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -67,15 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }; api.chat(requestModel, streamHandler); emitter.complete(); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index c81cc0ea..cc264803 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -69,8 +69,8 @@ public class OpenAIServiceImpl implements IChatService { try { openAiStreamClient.streamChatCompletion(completion, listener); } catch (Exception ex) { - // 同步异常也触发失败回调,按会话维度 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 同步异常也触发失败回调(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); throw ex; } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 8462529c..3f5c00b0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,18 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } @Override public void onError(Throwable error) { error.printStackTrace(); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index dfab7316..84602ade 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -136,8 +136,8 @@ public class SseServiceImpl implements ISseService { (modelForTry, onFailure) -> { // 替换请求中的模型名称 chatRequest.setModel(modelForTry.getModelName()); - // 将回调注册到ThreadLocal,供底层SSE失败时触发 - RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure); + // 以 emitter 实例为唯一键注册失败回调 + RetryNotifier.setFailureCallback(sseEmitter, onFailure); try { autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); } finally { diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index 1fd12406..7405a77b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,16 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // 透传错误并触发重试 + // 透传错误并触发重试(以 emitter 为键) emitter.send(error.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } }; @@ -73,7 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java index 77044081..25f65c44 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -5,31 +5,36 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** - * 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。 + * 失败回调通知器:基于发射器实例(SseEmitter 等对象地址)绑定回调, + * 避免与业务标识绑定,且能跨线程正确关联。 */ public class RetryNotifier { - private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); + private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); - public static void setFailureCallback(Long sessionId, Runnable callback) { - if (sessionId == null || callback == null) { - return; - } - FAILURE_CALLBACKS.put(sessionId, callback); + private static int keyOf(Object obj) { + return System.identityHashCode(obj); } - public static void clear(Long sessionId) { - if (sessionId == null) { + public static void setFailureCallback(Object emitterLike, Runnable callback) { + if (emitterLike == null || callback == null) { return; } - FAILURE_CALLBACKS.remove(sessionId); + FAILURE_CALLBACKS.put(keyOf(emitterLike), callback); } - public static void notifyFailure(Long sessionId) { - if (sessionId == null) { + public static void clear(Object emitterLike) { + if (emitterLike == null) { return; } - Runnable cb = FAILURE_CALLBACKS.get(sessionId); + FAILURE_CALLBACKS.remove(keyOf(emitterLike)); + } + + public static void notifyFailure(Object emitterLike) { + if (emitterLike == null) { + return; + } + Runnable cb = FAILURE_CALLBACKS.get(keyOf(emitterLike)); if (Objects.nonNull(cb)) { cb.run(); } From 9fba91c35f9d7c43a94b8f0690a271d29a30d987 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 18:00:20 +0800 Subject: [PATCH 09/10] =?UTF-8?q?feat:=20=E4=B8=8D=E9=80=89=E6=8B=A9?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E8=87=AA=E5=8A=A8=E9=80=89=E6=8B=A9=E6=97=B6?= =?UTF-8?q?=E8=B5=B0=E5=8E=9F=E5=A7=8B=E9=BB=98=E8=AE=A4=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/chat/impl/SseServiceImpl.java | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 84602ade..18cf3713 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -125,26 +125,31 @@ public class SseServiceImpl implements ISseService { // 自动选择模型并获取对应的聊天服务 IChatService chatService = autoSelectModelAndGetService(chatRequest); - // 统一重试与降级:封装启动逻辑,并通过ThreadLocal传递失败回调 - ChatModelVo currentModel = this.chatModelVo; - String currentCategory = currentModel.getCategory(); - ChatRetryHelper.executeWithRetry( - currentModel, - currentCategory, - chatModelService, - sseEmitter, - (modelForTry, onFailure) -> { - // 替换请求中的模型名称 - chatRequest.setModel(modelForTry.getModelName()); - // 以 emitter 实例为唯一键注册失败回调 - RetryNotifier.setFailureCallback(sseEmitter, onFailure); - try { - autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); - } finally { - // 不在此处清理,待下游结束/失败时清理 + // 仅当 autoSelectModel = true 时,才启用重试与降级 + if (Boolean.TRUE.equals(chatRequest.getAutoSelectModel())) { + ChatModelVo currentModel = this.chatModelVo; + String currentCategory = currentModel.getCategory(); + ChatRetryHelper.executeWithRetry( + currentModel, + currentCategory, + chatModelService, + sseEmitter, + (modelForTry, onFailure) -> { + // 替换请求中的模型名称 + chatRequest.setModel(modelForTry.getModelName()); + // 以 emitter 实例为唯一键注册失败回调 + RetryNotifier.setFailureCallback(sseEmitter, onFailure); + try { + autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); + } finally { + // 不在此处清理,待下游结束/失败时清理 + } } - } - ); + ); + } else { + // 不重试不降级,直接调用 + chatService.chat(chatRequest, sseEmitter); + } } catch (Exception e) { log.error(e.getMessage(),e); SSEUtil.sendErrorEvent(sseEmitter,e.getMessage()); From 842a39d6d2a0ea5680c24b56e4ebac681f3c3a63 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 20:28:53 +0800 Subject: [PATCH 10/10] =?UTF-8?q?feat:=20=E5=85=BC=E5=AE=B9=E4=B8=8D?= =?UTF-8?q?=E9=80=89=E8=87=AA=E5=8A=A8=E6=A8=A1=E5=9E=8B=E6=97=B6=E7=9A=84?= =?UTF-8?q?=E5=8E=9F=E5=85=88=E9=80=BB=E8=BE=91=EF=BC=9B=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E9=80=9A=E7=94=A8=E6=96=B9=E6=B3=95=EF=BC=8C=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E5=88=9B=E5=BB=BA=E6=9C=89=E7=9B=91=E6=8E=A7=E7=9A=84SSE?= =?UTF-8?q?=EF=BC=8C=E7=AE=80=E5=8C=96=E6=B5=81=E5=BC=8F=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E8=BE=93=E5=87=BA=E5=B9=B6=E9=80=9A=E7=9F=A5=E9=87=8D=E8=AF=95?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat/listener/SSEEventSourceListener.java | 21 ++++++--- .../service/chat/impl/CozeServiceImpl.java | 3 +- .../service/chat/impl/DeepSeekChatImpl.java | 8 ++-- .../service/chat/impl/DifyServiceImpl.java | 7 +-- .../service/chat/impl/ImageServiceImpl.java | 10 ++++- .../service/chat/impl/OllamaServiceImpl.java | 7 ++- .../service/chat/impl/OpenAIServiceImpl.java | 9 ++-- .../chat/impl/QianWenAiChatServiceImpl.java | 5 ++- .../chat/impl/ZhipuAiChatServiceImpl.java | 7 ++- .../ruoyi/chat/support/ChatServiceHelper.java | 45 +++++++++++++++++++ .../org/ruoyi/chat/support/RetryNotifier.java | 7 +++ 11 files changed, 97 insertions(+), 32 deletions(-) create mode 100644 ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index ddcfcaaa..d10f8036 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -46,12 +46,15 @@ public class SSEEventSourceListener extends EventSourceListener { private String token; + private boolean retryEnabled; + @Autowired(required = false) - public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token) { + public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token, boolean retryEnabled) { this.emitter = emitter; this.userId = userId; this.sessionId = sessionId; this.token = token; + this.retryEnabled = retryEnabled; } @@ -127,8 +130,12 @@ public class SSEEventSourceListener extends EventSourceListener { if (Objects.isNull(response)) { // 透传错误到前端 SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - // 通知重试(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + if (retryEnabled) { + // 通知重试(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } return; } ResponseBody body = response.body(); @@ -140,8 +147,12 @@ public class SSEEventSourceListener extends EventSourceListener { log.error("OpenAI sse连接异常data:{},异常:{}", response, t); SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } - // 通知重试 - RetryNotifier.notifyFailure(emitter); + if (retryEnabled) { + // 通知重试 + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index e9863b6c..730c0181 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.support.ChatServiceHelper; /** * 扣子聊天管理 @@ -69,7 +70,7 @@ public class CozeServiceImpl implements IChatService { } ); } catch (Exception ex) { - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); } finally { coze.shutdownExecutor(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index 0a6e6693..c2697c11 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -9,14 +9,13 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.ruoyi.chat.enums.ChatModeType; import org.ruoyi.chat.service.chat.IChatService; +import org.ruoyi.chat.support.ChatServiceHelper; import org.ruoyi.common.chat.request.ChatRequest; import org.ruoyi.domain.vo.ChatModelVo; import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import org.ruoyi.chat.support.RetryNotifier; /** * deepseek */ @@ -58,15 +57,14 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); - // 通知上层失败,进入重试/降级(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); // 同步异常直接通知失败 - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java index 9327e5a1..51f9a960 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java @@ -25,6 +25,7 @@ import org.ruoyi.service.IChatSessionService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.ChatServiceHelper; import java.util.Objects; import org.ruoyi.chat.support.RetryNotifier; @@ -119,18 +120,18 @@ public class DifyServiceImpl implements IChatService { @Override public void onError(ErrorEvent event) { System.err.println("错误: " + event.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, event.getMessage()); } @Override public void onException(Throwable throwable) { System.err.println("异常: " + throwable.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, throwable.getMessage()); } }); } catch (Exception e) { log.error("dify请求失败:{}", e.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java index 1c4af69d..36aa9c54 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.*; +import org.ruoyi.chat.support.ChatServiceHelper; /** * 图片识别模型 @@ -131,7 +132,7 @@ public class ImageServiceImpl implements IChatService { // 获取会话token(从入口透传,避免非Web线程取值报错) String token = chatRequest.getToken(); // 创建 SSE 事件源监听器 - SSEEventSourceListener listener = new SSEEventSourceListener(emitter, chatRequest.getUserId(), chatRequest.getSessionId(), token); + SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest); // 构建聊天完成请求 ChatCompletion completion = ChatCompletion @@ -142,7 +143,12 @@ public class ImageServiceImpl implements IChatService { .build(); // 发起流式聊天完成请求 - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 2401b83e..7ce42215 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.support.ChatServiceHelper; /** @@ -66,16 +67,14 @@ public class OllamaServiceImpl implements IChatService { try { emitter.send(substr); } catch (IOException e) { - SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } }; api.chat(requestModel, streamHandler); emitter.complete(); RetryNotifier.clear(emitter); } catch (Exception e) { - SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index cc264803..6b8f72d6 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -1,12 +1,12 @@ package org.ruoyi.chat.service.chat.impl; -import cn.dev33.satoken.stp.StpUtil; import io.modelcontextprotocol.client.McpSyncClient; import lombok.extern.slf4j.Slf4j; import org.ruoyi.chat.config.ChatConfig; import org.ruoyi.chat.enums.ChatModeType; import org.ruoyi.chat.listener.SSEEventSourceListener; import org.ruoyi.chat.service.chat.IChatService; +import org.ruoyi.chat.support.ChatServiceHelper; import org.ruoyi.common.chat.entity.chat.ChatCompletion; import org.ruoyi.common.chat.entity.chat.Message; import org.ruoyi.common.chat.openai.OpenAiStreamClient; @@ -22,7 +22,6 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.List; -import org.ruoyi.chat.support.RetryNotifier; /** @@ -58,8 +57,7 @@ public class OpenAIServiceImpl implements IChatService { Message userMessage = Message.builder().content("工具返回信息:"+toolString).role(Message.Role.USER).build(); messages.add(userMessage); } - String token = chatRequest.getToken(); - SSEEventSourceListener listener = new SSEEventSourceListener(emitter,chatRequest.getUserId(),chatRequest.getSessionId(), token); + SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest); ChatCompletion completion = ChatCompletion .builder() .messages(messages) @@ -69,8 +67,7 @@ public class OpenAIServiceImpl implements IChatService { try { openAiStreamClient.streamChatCompletion(completion, listener); } catch (Exception ex) { - // 同步异常也触发失败回调(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); throw ex; } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 3f5c00b0..4128b84a 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -14,6 +14,7 @@ import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.ChatServiceHelper; /** @@ -57,12 +58,12 @@ public class QianWenAiChatServiceImpl implements IChatService { @Override public void onError(Throwable error) { error.printStackTrace(); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index 7405a77b..50e545e0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -15,6 +15,7 @@ import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.ChatServiceHelper; @@ -51,9 +52,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // 透传错误并触发重试(以 emitter 为键) - emitter.send(error.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } @Override @@ -73,7 +72,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java new file mode 100644 index 00000000..042dee9c --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java @@ -0,0 +1,45 @@ +package org.ruoyi.chat.support; + +import org.ruoyi.chat.listener.SSEEventSourceListener; +import org.ruoyi.common.chat.request.ChatRequest; +import org.ruoyi.chat.util.SSEUtil; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + * 抽取各聊天实现类的通用逻辑: + * - 创建带开关的 SSE 监听器 + * - 统一的流错误处理(根据是否在重试场景决定通知或直接结束) + * - 统一的完成处理(清理回调并 complete) + */ +public class ChatServiceHelper { + + public static SSEEventSourceListener createOpenAiListener(SseEmitter emitter, ChatRequest chatRequest) { + boolean retryEnabled = Boolean.TRUE.equals(chatRequest.getAutoSelectModel()); + return new SSEEventSourceListener( + emitter, + chatRequest.getUserId(), + chatRequest.getSessionId(), + chatRequest.getToken(), + retryEnabled + ); + } + + public static void onStreamError(SseEmitter emitter, String errorMessage) { + SSEUtil.sendErrorEvent(emitter, errorMessage); + if (RetryNotifier.hasCallback(emitter)) { + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } + } + + public static void onStreamComplete(SseEmitter emitter) { + try { + emitter.complete(); + } finally { + RetryNotifier.clear(emitter); + } + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java index 25f65c44..c37f82cb 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -39,6 +39,13 @@ public class RetryNotifier { cb.run(); } } + + public static boolean hasCallback(Object emitterLike) { + if (emitterLike == null) { + return false; + } + return FAILURE_CALLBACKS.containsKey(keyOf(emitterLike)); + } }