From aa11c1f233d73e1a4b4e4d0c76b45f5d4ddb5c78 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 16:46:25 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=97=AE=E7=AD=94=E6=97=B6=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E7=BB=9F=E4=B8=80=E9=87=8D=E8=AF=95=E5=92=8C=E9=99=8D?= =?UTF-8?q?=E7=BA=A7=E9=80=BB=E8=BE=91=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/ruoyi/domain/vo/ChatModelVo.java | 6 + .../org/ruoyi/service/IChatModelService.java | 5 + .../service/impl/ChatModelServiceImpl.java | 14 +++ .../chat/listener/SSEEventSourceListener.java | 17 ++- .../service/chat/impl/DeepSeekChatImpl.java | 5 + .../service/chat/impl/OpenAIServiceImpl.java | 9 +- .../service/chat/impl/SseServiceImpl.java | 32 ++++- .../ruoyi/chat/support/ChatRetryHelper.java | 115 ++++++++++++++++++ .../org/ruoyi/chat/support/RetryNotifier.java | 39 ++++++ .../java/org/ruoyi/chat/util/SSEUtil.java | 2 +- 10 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java create mode 100644 ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java index 8472929d..257c0ec3 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/domain/vo/ChatModelVo.java @@ -90,6 +90,12 @@ public class ChatModelVo implements Serializable { @ExcelProperty(value = "密钥") private String apiKey; + /** + * 优先级(值越大优先级越高) + */ + @ExcelProperty(value = "优先级") + private Integer priority; + /** * 备注 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java index d93b527e..9e90e46b 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/IChatModelService.java @@ -63,6 +63,11 @@ public interface IChatModelService { */ ChatModelVo selectModelByCategoryWithHighestPriority(String category); + /** + * 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。 + */ + ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority); + /** * 获取ppt模型信息 */ diff --git a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java index b7acfaa9..069d75a6 100644 --- a/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java +++ b/ruoyi-modules-api/ruoyi-chat-api/src/main/java/org/ruoyi/service/impl/ChatModelServiceImpl.java @@ -150,6 +150,20 @@ public class ChatModelServiceImpl implements IChatModelService { ); } + /** + * 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。 + */ + @Override + public ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority) { + return baseMapper.selectVoOne( + Wrappers.lambdaQuery() + .eq(ChatModel::getCategory, category) + .lt(ChatModel::getPriority, currentPriority) + .orderByDesc(ChatModel::getPriority) + .last("LIMIT 1") + ); + } + @Override public ChatModel getPPT() { return baseMapper.selectOne(Wrappers.lambdaQuery().eq(ChatModel::getModelName, "ppt")); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index b8c8167e..b6087f07 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -21,6 +21,8 @@ import org.ruoyi.common.core.utils.SpringUtils; import org.ruoyi.common.core.utils.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.ruoyi.chat.util.SSEUtil; +import org.ruoyi.chat.support.RetryNotifier; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; @@ -77,6 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener { if ("[DONE]".equals(data)) { //成功响应 emitter.complete(); + // 清理失败回调 + RetryNotifier.clear(sessionId); // 扣除费用 ChatRequest chatRequest = new ChatRequest(); // 设置对话角色 @@ -115,20 +119,31 @@ public class SSEEventSourceListener extends EventSourceListener { @Override public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); + // 清理失败回调 + RetryNotifier.clear(sessionId); } @SneakyThrows @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { + // 透传错误到前端 + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + // 通知重试 + RetryNotifier.notifyFailure(sessionId); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { - log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t); + String msg = body.string(); + log.error("OpenAI sse连接异常data:{},异常:{}", msg, t); + SSEUtil.sendErrorEvent(emitter, msg); } else { log.error("OpenAI sse连接异常data:{},异常:{}", response, t); + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } + // 通知重试 + RetryNotifier.notifyFailure(sessionId); eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index 9e59fdd7..f0ddd77b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.RetryNotifier; /** * deepseek */ @@ -57,11 +58,15 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); + // 通知上层失败,进入重试/降级 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); + // 同步异常直接通知失败 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index 50693617..b46fa03d 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.List; +import org.ruoyi.chat.support.RetryNotifier; /** @@ -65,7 +66,13 @@ public class OpenAIServiceImpl implements IChatService { .model(chatRequest.getModel()) .stream(true) .build(); - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + // 同步异常也触发失败回调,按会话维度 + RetryNotifier.notifyFailure(chatRequest.getSessionId()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 3a948bea..01be0cec 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -9,6 +9,8 @@ import org.ruoyi.chat.factory.ChatServiceFactory; import org.ruoyi.chat.service.chat.IChatCostService; import org.ruoyi.chat.service.chat.IChatService; import org.ruoyi.chat.service.chat.ISseService; +import org.ruoyi.chat.support.ChatRetryHelper; +import org.ruoyi.chat.support.RetryNotifier; import org.ruoyi.chat.util.SSEUtil; import org.ruoyi.common.chat.entity.Tts.TextToSpeech; import org.ruoyi.common.chat.entity.chat.Message; @@ -116,7 +118,27 @@ public class SseServiceImpl implements ISseService { } // 自动选择模型并获取对应的聊天服务 IChatService chatService = autoSelectModelAndGetService(chatRequest); - chatService.chat(chatRequest, sseEmitter); + + // 统一重试与降级:封装启动逻辑,并通过ThreadLocal传递失败回调 + ChatModelVo currentModel = this.chatModelVo; + String currentCategory = currentModel.getCategory(); + ChatRetryHelper.executeWithRetry( + currentModel, + currentCategory, + chatModelService, + sseEmitter, + (modelForTry, onFailure) -> { + // 替换请求中的模型名称 + chatRequest.setModel(modelForTry.getModelName()); + // 将回调注册到ThreadLocal,供底层SSE失败时触发 + RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure); + try { + autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); + } finally { + // 不在此处清理,待下游结束/失败时清理 + } + } + ); } catch (Exception e) { log.error(e.getMessage(),e); SSEUtil.sendErrorEvent(sseEmitter,e.getMessage()); @@ -149,6 +171,14 @@ public class SseServiceImpl implements ISseService { throw new IllegalStateException("模型选择和服务获取失败: " + e.getMessage()); } } + + /** + * 根据给定分类获取服务并发起调用(避免在降级时重复选择模型) + */ + private void autoSelectServiceByCategoryAndInvoke(ChatRequest chatRequest, SseEmitter sseEmitter, String category) { + IChatService service = chatServiceFactory.getChatService(category); + service.chat(chatRequest, sseEmitter); + } /** * 根据分类选择优先级最高的模型 diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java new file mode 100644 index 00000000..bf4a4a74 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatRetryHelper.java @@ -0,0 +1,115 @@ +package org.ruoyi.chat.support; + +import lombok.extern.slf4j.Slf4j; +import org.ruoyi.chat.util.SSEUtil; +import org.ruoyi.domain.vo.ChatModelVo; +import org.ruoyi.service.IChatModelService; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +/** + * 统一的聊天重试与降级调度器。 + * + * 策略: + * - 当前模型最多重试 3 次;仍失败则降级到同分类内、优先级小于当前的最高优先级模型。 + * - 降级模型同样最多重试 3 次;仍失败则向前端返回失败信息并停止。 + * + * 注意:实现依赖调用方在底层异步失败时执行 onFailure.run() 通知本调度器。 + */ +@Slf4j +public class ChatRetryHelper { + + public interface AttemptStarter { + void start(ChatModelVo model, Runnable onFailure) throws Exception; + } + + public static void executeWithRetry( + ChatModelVo primaryModel, + String category, + IChatModelService chatModelService, + SseEmitter emitter, + AttemptStarter attemptStarter + ) { + Objects.requireNonNull(primaryModel, "primaryModel must not be null"); + Objects.requireNonNull(category, "category must not be null"); + Objects.requireNonNull(chatModelService, "chatModelService must not be null"); + Objects.requireNonNull(emitter, "emitter must not be null"); + Objects.requireNonNull(attemptStarter, "attemptStarter must not be null"); + + AtomicInteger mainAttempts = new AtomicInteger(0); + AtomicInteger fallbackAttempts = new AtomicInteger(0); + AtomicBoolean inFallback = new AtomicBoolean(false); + AtomicBoolean scheduling = new AtomicBoolean(false); + + class Scheduler { + volatile ChatModelVo current = primaryModel; + volatile ChatModelVo fallback = null; + + void startAttempt() { + try { + if (!inFallback.get()) { + if (mainAttempts.incrementAndGet() > 3) { + // 进入降级 + inFallback.set(true); + if (fallback == null) { + Integer curPriority = primaryModel.getPriority(); + if (curPriority == null) { + curPriority = Integer.MAX_VALUE; + } + fallback = chatModelService.selectFallbackModelByCategoryAndLessPriority(category, curPriority); + } + if (fallback == null) { + SSEUtil.sendErrorEvent(emitter, "当前模型重试3次均失败,且无可用降级模型"); + emitter.complete(); + return; + } + current = fallback; + mainAttempts.set(3); // 锁定 + fallbackAttempts.set(0); + } + } else { + if (fallbackAttempts.incrementAndGet() > 3) { + SSEUtil.sendErrorEvent(emitter, "降级模型重试3次仍失败"); + emitter.complete(); + return; + } + } + + Runnable onFailure = () -> { + // 去抖:避免同一次失败触发多次重试 + if (scheduling.compareAndSet(false, true)) { + try { + SSEUtil.sendErrorEvent(emitter, (inFallback.get() ? "降级模型" : "当前模型") + "调用失败,准备重试..."); + // 立即发起下一次尝试 + startAttempt(); + } finally { + scheduling.set(false); + } + } + }; + + attemptStarter.start(current, onFailure); + } catch (Exception ex) { + log.error("启动聊天尝试失败: {}", ex.getMessage(), ex); + SSEUtil.sendErrorEvent(emitter, "启动聊天尝试失败: " + ex.getMessage()); + // 直接按失败处理,继续重试/降级 + if (scheduling.compareAndSet(false, true)) { + try { + startAttempt(); + } finally { + scheduling.set(false); + } + } + } + } + } + + new Scheduler().startAttempt(); + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java new file mode 100644 index 00000000..77044081 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -0,0 +1,39 @@ +package org.ruoyi.chat.support; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。 + */ +public class RetryNotifier { + + private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); + + public static void setFailureCallback(Long sessionId, Runnable callback) { + if (sessionId == null || callback == null) { + return; + } + FAILURE_CALLBACKS.put(sessionId, callback); + } + + public static void clear(Long sessionId) { + if (sessionId == null) { + return; + } + FAILURE_CALLBACKS.remove(sessionId); + } + + public static void notifyFailure(Long sessionId) { + if (sessionId == null) { + return; + } + Runnable cb = FAILURE_CALLBACKS.get(sessionId); + if (Objects.nonNull(cb)) { + cb.run(); + } + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java index 9bfb6bf0..293e486e 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/util/SSEUtil.java @@ -25,6 +25,6 @@ public class SSEUtil { } catch (IOException e) { log.error("SSE发送失败: {}", e.getMessage()); } - sseEmitter.complete(); + // 不立即关闭,由上层策略决定是否继续重试或降级 } }