diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index dd8519f2..3895c1a3 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -48,9 +48,7 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); - if (sessionId != null) { - RetryNotifier.clear(sessionId); - } + RetryNotifier.clear(emitter); } else { emitter.send(data); } @@ -68,26 +66,20 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { String msg = body.string(); log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, msg); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(emitter); } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(emitter); } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index b6087f07..d61e8bb4 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -79,8 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener { if ("[DONE]".equals(data)) { //成功响应 emitter.complete(); - // 清理失败回调 - RetryNotifier.clear(sessionId); + // 清理失败回调(以 emitter 为键) + RetryNotifier.clear(emitter); // 扣除费用 ChatRequest chatRequest = new ChatRequest(); // 设置对话角色 @@ -120,7 +120,7 @@ public class SSEEventSourceListener extends EventSourceListener { public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); // 清理失败回调 - RetryNotifier.clear(sessionId); + RetryNotifier.clear(emitter); } @SneakyThrows @@ -129,8 +129,8 @@ public class SSEEventSourceListener extends EventSourceListener { if (Objects.isNull(response)) { // 透传错误到前端 SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - // 通知重试 - RetryNotifier.notifyFailure(sessionId); + // 通知重试(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); @@ -143,7 +143,7 @@ public class SSEEventSourceListener extends EventSourceListener { SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } // 通知重试 - RetryNotifier.notifyFailure(sessionId); + RetryNotifier.notifyFailure(emitter); eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 54269b08..e9863b6c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -64,12 +64,12 @@ public class CozeServiceImpl implements IChatService { if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { emitter.complete(); log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } } ); } catch (Exception ex) { - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } finally { coze.shutdownExecutor(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index f0ddd77b..0a6e6693 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -58,15 +58,15 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); - // 通知上层失败,进入重试/降级 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 通知上层失败,进入重试/降级(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); // 同步异常直接通知失败 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 669f1c2f..2401b83e 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -67,15 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }; api.chat(requestModel, streamHandler); emitter.complete(); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index c81cc0ea..cc264803 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -69,8 +69,8 @@ public class OpenAIServiceImpl implements IChatService { try { openAiStreamClient.streamChatCompletion(completion, listener); } catch (Exception ex) { - // 同步异常也触发失败回调,按会话维度 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 同步异常也触发失败回调(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); throw ex; } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 8462529c..3f5c00b0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,18 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } @Override public void onError(Throwable error) { error.printStackTrace(); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 7fe65818..c6d3fe06 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -137,8 +137,8 @@ public class SseServiceImpl implements ISseService { (modelForTry, onFailure) -> { // 替换请求中的模型名称 chatRequest.setModel(modelForTry.getModelName()); - // 将回调注册到ThreadLocal,供底层SSE失败时触发 - RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure); + // 以 emitter 实例为唯一键注册失败回调 + RetryNotifier.setFailureCallback(sseEmitter, onFailure); try { autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); } finally { diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index 1fd12406..7405a77b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,16 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // 透传错误并触发重试 + // 透传错误并触发重试(以 emitter 为键) emitter.send(error.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } }; @@ -73,7 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java index 77044081..25f65c44 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -5,31 +5,36 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** - * 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。 + * 失败回调通知器:基于发射器实例(SseEmitter 等对象地址)绑定回调, + * 避免与业务标识绑定,且能跨线程正确关联。 */ public class RetryNotifier { - private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); + private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); - public static void setFailureCallback(Long sessionId, Runnable callback) { - if (sessionId == null || callback == null) { - return; - } - FAILURE_CALLBACKS.put(sessionId, callback); + private static int keyOf(Object obj) { + return System.identityHashCode(obj); } - public static void clear(Long sessionId) { - if (sessionId == null) { + public static void setFailureCallback(Object emitterLike, Runnable callback) { + if (emitterLike == null || callback == null) { return; } - FAILURE_CALLBACKS.remove(sessionId); + FAILURE_CALLBACKS.put(keyOf(emitterLike), callback); } - public static void notifyFailure(Long sessionId) { - if (sessionId == null) { + public static void clear(Object emitterLike) { + if (emitterLike == null) { return; } - Runnable cb = FAILURE_CALLBACKS.get(sessionId); + FAILURE_CALLBACKS.remove(keyOf(emitterLike)); + } + + public static void notifyFailure(Object emitterLike) { + if (emitterLike == null) { + return; + } + Runnable cb = FAILURE_CALLBACKS.get(keyOf(emitterLike)); if (Objects.nonNull(cb)) { cb.run(); }