diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index d61e8bb4..ceeb9e7b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -46,12 +46,15 @@ public class SSEEventSourceListener extends EventSourceListener { private String token; + private boolean retryEnabled; + @Autowired(required = false) - public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token) { + public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token, boolean retryEnabled) { this.emitter = emitter; this.userId = userId; this.sessionId = sessionId; this.token = token; + this.retryEnabled = retryEnabled; } @@ -129,8 +132,12 @@ public class SSEEventSourceListener extends EventSourceListener { if (Objects.isNull(response)) { // 透传错误到前端 SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - // 通知重试(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + if (retryEnabled) { + // 通知重试(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } return; } ResponseBody body = response.body(); @@ -142,8 +149,12 @@ public class SSEEventSourceListener extends EventSourceListener { log.error("OpenAI sse连接异常data:{},异常:{}", response, t); SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } - // 通知重试 - RetryNotifier.notifyFailure(emitter); + if (retryEnabled) { + // 通知重试 + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index e9863b6c..730c0181 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.support.ChatServiceHelper; /** * 扣子聊天管理 @@ -69,7 +70,7 @@ public class CozeServiceImpl implements IChatService { } ); } catch (Exception ex) { - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); } finally { coze.shutdownExecutor(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index 0a6e6693..c2697c11 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -9,14 +9,13 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.ruoyi.chat.enums.ChatModeType; import org.ruoyi.chat.service.chat.IChatService; +import org.ruoyi.chat.support.ChatServiceHelper; import org.ruoyi.common.chat.request.ChatRequest; import org.ruoyi.domain.vo.ChatModelVo; import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import org.ruoyi.chat.support.RetryNotifier; /** * deepseek */ @@ -58,15 +57,14 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); - // 通知上层失败,进入重试/降级(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); // 同步异常直接通知失败 - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java index 1c4af69d..36aa9c54 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ImageServiceImpl.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.*; +import org.ruoyi.chat.support.ChatServiceHelper; /** * 图片识别模型 @@ -131,7 +132,7 @@ public class ImageServiceImpl implements IChatService { // 获取会话token(从入口透传,避免非Web线程取值报错) String token = chatRequest.getToken(); // 创建 SSE 事件源监听器 - SSEEventSourceListener listener = new SSEEventSourceListener(emitter, chatRequest.getUserId(), chatRequest.getSessionId(), token); + SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest); // 构建聊天完成请求 ChatCompletion completion = ChatCompletion @@ -142,7 +143,12 @@ public class ImageServiceImpl implements IChatService { .build(); // 发起流式聊天完成请求 - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 2401b83e..7ce42215 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.support.ChatServiceHelper; /** @@ -66,16 +67,14 @@ public class OllamaServiceImpl implements IChatService { try { emitter.send(substr); } catch (IOException e) { - SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } }; api.chat(requestModel, streamHandler); emitter.complete(); RetryNotifier.clear(emitter); } catch (Exception e) { - SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index cc264803..6b8f72d6 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -1,12 +1,12 @@ package org.ruoyi.chat.service.chat.impl; -import cn.dev33.satoken.stp.StpUtil; import io.modelcontextprotocol.client.McpSyncClient; import lombok.extern.slf4j.Slf4j; import org.ruoyi.chat.config.ChatConfig; import org.ruoyi.chat.enums.ChatModeType; import org.ruoyi.chat.listener.SSEEventSourceListener; import org.ruoyi.chat.service.chat.IChatService; +import org.ruoyi.chat.support.ChatServiceHelper; import org.ruoyi.common.chat.entity.chat.ChatCompletion; import org.ruoyi.common.chat.entity.chat.Message; import org.ruoyi.common.chat.openai.OpenAiStreamClient; @@ -22,7 +22,6 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.List; -import org.ruoyi.chat.support.RetryNotifier; /** @@ -58,8 +57,7 @@ public class OpenAIServiceImpl implements IChatService { Message userMessage = Message.builder().content("工具返回信息:"+toolString).role(Message.Role.USER).build(); messages.add(userMessage); } - String token = chatRequest.getToken(); - SSEEventSourceListener listener = new SSEEventSourceListener(emitter,chatRequest.getUserId(),chatRequest.getSessionId(), token); + SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest); ChatCompletion completion = ChatCompletion .builder() .messages(messages) @@ -69,8 +67,7 @@ public class OpenAIServiceImpl implements IChatService { try { openAiStreamClient.streamChatCompletion(completion, listener); } catch (Exception ex) { - // 同步异常也触发失败回调(以 emitter 为键) - RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, ex.getMessage()); throw ex; } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 3f5c00b0..4128b84a 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -14,6 +14,7 @@ import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.ChatServiceHelper; /** @@ -57,12 +58,12 @@ public class QianWenAiChatServiceImpl implements IChatService { @Override public void onError(Throwable error) { error.printStackTrace(); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index 7405a77b..50e545e0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -15,6 +15,7 @@ import org.ruoyi.service.IChatModelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.ruoyi.chat.support.ChatServiceHelper; @@ -51,9 +52,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // 透传错误并触发重试(以 emitter 为键) - emitter.send(error.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, error.getMessage()); } @Override @@ -73,7 +72,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); + ChatServiceHelper.onStreamError(emitter, e.getMessage()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java new file mode 100644 index 00000000..042dee9c --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/ChatServiceHelper.java @@ -0,0 +1,45 @@ +package org.ruoyi.chat.support; + +import org.ruoyi.chat.listener.SSEEventSourceListener; +import org.ruoyi.common.chat.request.ChatRequest; +import org.ruoyi.chat.util.SSEUtil; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + * 抽取各聊天实现类的通用逻辑: + * - 创建带开关的 SSE 监听器 + * - 统一的流错误处理(根据是否在重试场景决定通知或直接结束) + * - 统一的完成处理(清理回调并 complete) + */ +public class ChatServiceHelper { + + public static SSEEventSourceListener createOpenAiListener(SseEmitter emitter, ChatRequest chatRequest) { + boolean retryEnabled = Boolean.TRUE.equals(chatRequest.getAutoSelectModel()); + return new SSEEventSourceListener( + emitter, + chatRequest.getUserId(), + chatRequest.getSessionId(), + chatRequest.getToken(), + retryEnabled + ); + } + + public static void onStreamError(SseEmitter emitter, String errorMessage) { + SSEUtil.sendErrorEvent(emitter, errorMessage); + if (RetryNotifier.hasCallback(emitter)) { + RetryNotifier.notifyFailure(emitter); + } else { + emitter.complete(); + } + } + + public static void onStreamComplete(SseEmitter emitter) { + try { + emitter.complete(); + } finally { + RetryNotifier.clear(emitter); + } + } +} + + diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java index 25f65c44..c37f82cb 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -39,6 +39,13 @@ public class RetryNotifier { cb.run(); } } + + public static boolean hasCallback(Object emitterLike) { + if (emitterLike == null) { + return false; + } + return FAILURE_CALLBACKS.containsKey(keyOf(emitterLike)); + } }