From 4b37cfe97d0e78a25a3f0cc3228cb0245d956d9d Mon Sep 17 00:00:00 2001 From: likunlong Date: Tue, 19 Aug 2025 17:53:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A4=B1=E8=B4=A5=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E5=99=A8=E4=B8=AD=E4=BD=BF=E7=94=A8emitter=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E7=9A=84=E5=94=AF=E4=B8=80hash=E4=BD=9C=E4=B8=BAkey=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E4=BD=BF=E7=94=A8session=EF=BC=8C=E4=B8=8D?= =?UTF-8?q?=E4=B8=8E=E4=B8=9A=E5=8A=A1=E8=BF=9B=E8=A1=8C=E7=BB=91=E5=AE=9A?= =?UTF-8?q?=EF=BC=8C=E5=90=8C=E6=97=B6=E4=B9=9F=E4=BF=9D=E8=AF=81=E8=B7=A8?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E8=B0=83=E7=94=A8=E7=9A=84=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E6=80=A7=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FastGPTSSEEventSourceListener.java | 22 +++++-------- .../chat/listener/SSEEventSourceListener.java | 12 +++---- .../service/chat/impl/CozeServiceImpl.java | 4 +-- .../service/chat/impl/DeepSeekChatImpl.java | 6 ++-- .../service/chat/impl/OllamaServiceImpl.java | 6 ++-- .../service/chat/impl/OpenAIServiceImpl.java | 4 +-- .../chat/impl/QianWenAiChatServiceImpl.java | 6 ++-- .../service/chat/impl/SseServiceImpl.java | 4 +-- .../chat/impl/ZhipuAiChatServiceImpl.java | 8 ++--- .../org/ruoyi/chat/support/RetryNotifier.java | 31 +++++++++++-------- 10 files changed, 50 insertions(+), 53 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java index dd8519f2..3895c1a3 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/FastGPTSSEEventSourceListener.java @@ -48,9 +48,7 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { if ("flowResponses".equals(type)){ emitter.send(data); emitter.complete(); - if (sessionId != null) { - RetryNotifier.clear(sessionId); - } + RetryNotifier.clear(emitter); } else { emitter.send(data); } @@ -68,26 +66,20 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener { @SneakyThrows public void onFailure(EventSource eventSource, Throwable t, Response response) { if (Objects.isNull(response)) { - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { String msg = body.string(); log.error("FastGPT sse连接异常data:{},异常:{}", msg, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, msg); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, msg); + RetryNotifier.notifyFailure(emitter); } else { log.error("FastGPT sse连接异常data:{},异常:{}", response, t); - if (sessionId != null) { - SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); - RetryNotifier.notifyFailure(sessionId); - } + SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); + RetryNotifier.notifyFailure(emitter); } eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java index b6087f07..d61e8bb4 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/SSEEventSourceListener.java @@ -79,8 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener { if ("[DONE]".equals(data)) { //成功响应 emitter.complete(); - // 清理失败回调 - RetryNotifier.clear(sessionId); + // 清理失败回调(以 emitter 为键) + RetryNotifier.clear(emitter); // 扣除费用 ChatRequest chatRequest = new ChatRequest(); // 设置对话角色 @@ -120,7 +120,7 @@ public class SSEEventSourceListener extends EventSourceListener { public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); // 清理失败回调 - RetryNotifier.clear(sessionId); + RetryNotifier.clear(emitter); } @SneakyThrows @@ -129,8 +129,8 @@ public class SSEEventSourceListener extends EventSourceListener { if (Objects.isNull(response)) { // 透传错误到前端 SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败"); - // 通知重试 - RetryNotifier.notifyFailure(sessionId); + // 通知重试(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); return; } ResponseBody body = response.body(); @@ -143,7 +143,7 @@ public class SSEEventSourceListener extends EventSourceListener { SSEUtil.sendErrorEvent(emitter, String.valueOf(response)); } // 通知重试 - RetryNotifier.notifyFailure(sessionId); + RetryNotifier.notifyFailure(emitter); eventSource.cancel(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java index 54269b08..e9863b6c 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/CozeServiceImpl.java @@ -64,12 +64,12 @@ public class CozeServiceImpl implements IChatService { if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) { emitter.complete(); log.info("Token usage: {}", event.getChat().getUsage().getTokenCount()); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } } ); } catch (Exception ex) { - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } finally { coze.shutdownExecutor(); } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java index f0ddd77b..0a6e6693 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/DeepSeekChatImpl.java @@ -58,15 +58,15 @@ public class DeepSeekChatImpl implements IChatService { @Override public void onError(Throwable error) { System.err.println("错误: " + error.getMessage()); - // 通知上层失败,进入重试/降级 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 通知上层失败,进入重试/降级(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("deepseek请求失败:{}", e.getMessage()); // 同步异常直接通知失败 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java index 669f1c2f..2401b83e 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OllamaServiceImpl.java @@ -67,15 +67,15 @@ public class OllamaServiceImpl implements IChatService { emitter.send(substr); } catch (IOException e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }; api.chat(requestModel, streamHandler); emitter.complete(); - RetryNotifier.clear(chatRequest.getSessionId()); + RetryNotifier.clear(emitter); } catch (Exception e) { SSEUtil.sendErrorEvent(emitter, e.getMessage()); - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + RetryNotifier.notifyFailure(emitter); } }); diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java index c81cc0ea..cc264803 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/OpenAIServiceImpl.java @@ -69,8 +69,8 @@ public class OpenAIServiceImpl implements IChatService { try { openAiStreamClient.streamChatCompletion(completion, listener); } catch (Exception ex) { - // 同步异常也触发失败回调,按会话维度 - RetryNotifier.notifyFailure(chatRequest.getSessionId()); + // 同步异常也触发失败回调(以 emitter 为键) + RetryNotifier.notifyFailure(emitter); throw ex; } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java index 8462529c..3f5c00b0 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/QianWenAiChatServiceImpl.java @@ -51,18 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService { public void onCompleteResponse(ChatResponse completeResponse) { emitter.complete(); log.info("消息结束,完整消息ID: {}", completeResponse); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } @Override public void onError(Throwable error) { error.printStackTrace(); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } }); } catch (Exception e) { log.error("千问请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java index 7fe65818..c6d3fe06 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/SseServiceImpl.java @@ -137,8 +137,8 @@ public class SseServiceImpl implements ISseService { (modelForTry, onFailure) -> { // 替换请求中的模型名称 chatRequest.setModel(modelForTry.getModelName()); - // 将回调注册到ThreadLocal,供底层SSE失败时触发 - RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure); + // 以 emitter 实例为唯一键注册失败回调 + RetryNotifier.setFailureCallback(sseEmitter, onFailure); try { autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory()); } finally { diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java index 1fd12406..7405a77b 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ZhipuAiChatServiceImpl.java @@ -51,16 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService { @SneakyThrows @Override public void onError(Throwable error) { - // 透传错误并触发重试 + // 透传错误并触发重试(以 emitter 为键) emitter.send(error.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } @Override public void onCompleteResponse(ChatResponse response) { emitter.complete(); log.info("消息结束,完整消息ID: {}", response.aiMessage()); - org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.clear(emitter); } }; @@ -73,7 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService { model.chat(chatRequest.getPrompt(), handler); } catch (Exception e) { log.error("智谱清言请求失败:{}", e.getMessage()); - org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId()); + org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter); } return emitter; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java index 77044081..25f65c44 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/support/RetryNotifier.java @@ -5,31 +5,36 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** - * 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。 + * 失败回调通知器:基于发射器实例(SseEmitter 等对象地址)绑定回调, + * 避免与业务标识绑定,且能跨线程正确关联。 */ public class RetryNotifier { - private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); + private static final Map FAILURE_CALLBACKS = new ConcurrentHashMap<>(); - public static void setFailureCallback(Long sessionId, Runnable callback) { - if (sessionId == null || callback == null) { - return; - } - FAILURE_CALLBACKS.put(sessionId, callback); + private static int keyOf(Object obj) { + return System.identityHashCode(obj); } - public static void clear(Long sessionId) { - if (sessionId == null) { + public static void setFailureCallback(Object emitterLike, Runnable callback) { + if (emitterLike == null || callback == null) { return; } - FAILURE_CALLBACKS.remove(sessionId); + FAILURE_CALLBACKS.put(keyOf(emitterLike), callback); } - public static void notifyFailure(Long sessionId) { - if (sessionId == null) { + public static void clear(Object emitterLike) { + if (emitterLike == null) { return; } - Runnable cb = FAILURE_CALLBACKS.get(sessionId); + FAILURE_CALLBACKS.remove(keyOf(emitterLike)); + } + + public static void notifyFailure(Object emitterLike) { + if (emitterLike == null) { + return; + } + Runnable cb = FAILURE_CALLBACKS.get(keyOf(emitterLike)); if (Objects.nonNull(cb)) { cb.run(); }