diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index 0d58176d..dd8519f2 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -14,6 +14,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; +import org.ruoyi.chat.support.RetryNotifier; +import org.ruoyi.chat.util.SSEUtil; @Slf4j @Component @@ -21,12 +23,18 @@ import java.util.Objects; public class FastGPTSSEEventSourceListener extends EventSourceListener { private SseEmitter emitter; + private Long sessionId; @Autowired(required = false) public FastGPTSSEEventSourceListener(SseEmitter emitter) { this.emitter = emitter; } + public FastGPTSSEEventSourceListener(SseEmitter emitter, Long sessionId) { + this.emitter = emitter; + this.sessionId = sessionId; + } + @Override public void onOpen(EventSource eventSource, Response response) { log.info("FastGPT sse连接成功"); @@ -40,6 +48,9 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); + if (sessionId != null) { + RetryNotifier.clear(sessionId); + } } else { emitter.send(data); } @@ -57,13 +68,26 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(sessionId); + } return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { - log.error("FastGPT sse连接异常data:{},异常:{}", body.string(), t); + String msg = body.string(); + log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(sessionId); + } } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); + if (sessionId != null) { + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(sessionId); + } } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 7cbb5927..54269b08 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -20,6 +20,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.ruoyi.chat.support.RetryNotifier; /** * 扣子聊天管理 @@ -53,19 +54,25 @@ public class CozeServiceImpl implements IChatService { Flowable resp = coze.chat().stream(req); ExecutorService executor = Executors.newFixedThreadPool(10); executor.submit(() -> { - resp.blockingForEach( - event -> { - if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { - emitter.send(event.getMessage().getContent()); - log.info("coze: {}", event.getMessage().getContent()); + try { + resp.blockingForEach( + event -> { + if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) { + emitter.send(event.getMessage().getContent()); + log.info("coze: {}", event.getMessage().getContent()); + } + if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { + emitter.complete(); + log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); + RetryNotifier.clear(chatRequest.getSessionId()); + } } - if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { - emitter.complete(); - log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - } - } - ); - coze.shutdownExecutor(); + ); + } catch (Exception ex) { + RetryNotifier.notifyFailure(chatRequest.getSessionId()); + } finally { + coze.shutdownExecutor(); + } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java index ac3ebab7..748d89c0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DifyServiceImpl.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Objects; +import org.ruoyi.chat.support.RetryNotifier; /** * dify 聊天管理 @@ -112,20 +113,24 @@ public class DifyServiceImpl implements IChatService { chatRequestResponse.setSessionId(chatRequest.getSessionId()); chatRequestResponse.setPrompt(respMessage.toString()); chatCostService.deductToken(chatRequestResponse); + RetryNotifier.clear(chatRequest.getSessionId()); } @Override public void onError(ErrorEvent event) { System.err.println("错误: " + event.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } @Override public void onException(Throwable throwable) { System.err.println("异常: " + throwable.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("dify请求失败:{}", e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java index 15acf6f2..b3f41431 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/FastGPTServiceImpl.java @@ -33,7 +33,7 @@ public class FastGPTServiceImpl implements IChatService { ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel()); OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey()); List messages = chatRequest.getMessages(); - FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter); + FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter, chatRequest.getSessionId()); FastGPTChatCompletion completion = FastGPTChatCompletion .builder() .messages(messages) @@ -41,7 +41,12 @@ public class FastGPTServiceImpl implements IChatService { .detail(true) .stream(true) .build(); - openAiStreamClient.streamChatCompletion(completion, listener); + try { + openAiStreamClient.streamChatCompletion(completion, listener); + } catch (Exception ex) { + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + throw ex; + } return emitter; } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 532b052e..669f1c2f 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.ruoyi.chat.support.RetryNotifier; /** @@ -66,12 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }; api.chat(requestModel, streamHandler); emitter.complete(); + RetryNotifier.clear(chatRequest.getSessionId()); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); + RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 850ebf6c..8462529c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,15 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } @Override public void onError(Throwable error) { error.printStackTrace(); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index da44d6c0..1fd12406 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,14 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // System.out.println(error.getMessage()); + // 透传错误并触发重试 emitter.send(error.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); + org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); } }; @@ -71,6 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); } return emitter;