From 359cee28d5071afad3e18a1b36d33f7d9f47d529 Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 16:51:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E7=9B=AE=E5=89=8D?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E7=B1=BB=E4=BD=BF=E7=94=A8=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E9=99=8D=E7=BA=A7=E9=80=BB=E8=BE=91=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FastGPTSSEEventSourceListener.java | 26 +++++++++++++++- .../service/chat/impl/CozeServiceImpl.java | 31 ++++++++++++------- .../service/chat/impl/FastGPTServiceImpl.java | 9 ++++-- .../service/chat/impl/OllamaServiceImpl.java | 4 +++ .../chat/impl/QianWenAiChatServiceImpl.java | 3 ++ .../chat/impl/ZhipuAiChatServiceImpl.java | 5 ++- 6 files changed, 62 insertions(+), 16 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index 0d58176d..dd8519f2 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -14,6 +14,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; +import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.util.SSEUtil; @Slf4j @Component @@ -21,12 +23,18 @@ import java.util.Objects; public class FastGPTSSEEventSourceListener extends EventSourceListener { private SseEmitter emitter; + private Long sessionId; @Autowired(required = false) public FastGPTSSEEventSourceListener(SseEmitter emitter) { this.emitter = emitter; } + public FastGPTSSEEventSourceListener(SseEmitter emitter, Long sessionId) { + this.emitter = emitter; + this.sessionId = sessionId; + } + @Override public void onOpen(EventSource eventSource, Response response) { log.info("FastGPT sse连接成功"); @@ -40,6 +48,9 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); + if (sessionId != null) { + RetryNotifier.clear(sessionId); + } } else { emitter.send(data); } @@ -57,13 +68,26 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(sessionId); + } return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { - log.error("FastGPT sse连接异常data:{},异常:{}", body.string(), t); + String msg = body.string(); + log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(sessionId); + } } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(sessionId); + } } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 7cbb5927..54269b08 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -20,6 +20,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.ruoyi.chat.support.RetryNotifier; /** * 扣子聊天管理 @@ -53,19 +54,25 @@ public class CozeServiceImpl implements IChatService { Flowable resp = coze.chat().stream(req); ExecutorService executor = Executors.newFixedThreadPool(10); executor.submit(() -> { - resp.blockingForEach( - event -> { - if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { - emitter.send(event.getMessage().getContent()); - log.info("coze: {}", event.getMessage().getContent()); + try { + resp.blockingForEach( + event -> { + if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { + emitter.send(event.getMessage().getContent()); + log.info("coze: {}", event.getMessage().getContent()); + } + if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { + emitter.complete(); + log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); + RetryNotifier.clear(chatRequest.getSessionId()); + } } - if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { - emitter.complete(); - log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - } - } - ); - coze.shutdownExecutor(); + ); + } catch (Exception ex) { + RetryNotifier.notifyFailure(chatRequest.getSessionId()); + } finally { + coze.shutdownExecutor(); + } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java index 15acf6f2..b3f41431 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java @@ -33,7 +33,7 @@ public class FastGPTServiceImpl implements IChatService { ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel()); OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey()); List messages = chatRequest.getMessages(); - FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter); + FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter, chatRequest.getSessionId()); FastGPTChatCompletion completion = FastGPTChatCompletion .builder() .messages(messages) @@ -41,7 +41,12 @@ public class FastGPTServiceImpl implements IChatService { .detail(true) .stream(true) .build(); - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 532b052e..669f1c2f 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.ruoyi.chat.support.RetryNotifier; /** @@ -66,12 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }; api.chat(requestModel, streamHandler); emitter.complete(); + RetryNotifier.clear(chatRequest.getSessionId()); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 850ebf6c..8462529c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,15 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } @Override public void onError(Throwable error) { error.printStackTrace(); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index da44d6c0..1fd12406 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,14 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // System.out.println(error.getMessage()); + // 透传错误并触发重试 emitter.send(error.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } }; @@ -71,6 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter;