feat: 失败回调器中使用emitter对象的唯一hash作为key,不再使用session,不与业务进行绑定,同时也保证跨线程调用的正确性;

This commit is contained in:
likunlong
2025-08-19 17:53:27 +08:00
committed by Administrator
parent c43d4784de
commit 4b37cfe97d
10 changed files with 50 additions and 53 deletions

View File

@@ -48,9 +48,7 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
if ("flowResponses".equals(type)){
emitter.send(data);
emitter.complete();
if (sessionId != null) {
RetryNotifier.clear(sessionId);
}
RetryNotifier.clear(emitter);
} else {
emitter.send(data);
}
@@ -68,26 +66,20 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
@SneakyThrows
public void onFailure(EventSource eventSource, Throwable t, Response response) {
if (Objects.isNull(response)) {
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
RetryNotifier.notifyFailure(sessionId);
}
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
RetryNotifier.notifyFailure(emitter);
return;
}
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
String msg = body.string();
log.error("FastGPT sse连接异常data{},异常:{}", msg, t);
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, msg);
RetryNotifier.notifyFailure(sessionId);
}
SSEUtil.sendErrorEvent(emitter, msg);
RetryNotifier.notifyFailure(emitter);
} else {
log.error("FastGPT sse连接异常data{},异常:{}", response, t);
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
RetryNotifier.notifyFailure(sessionId);
}
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
RetryNotifier.notifyFailure(emitter);
}
eventSource.cancel();
}

View File

@@ -79,8 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener {
if ("[DONE]".equals(data)) {
//成功响应
emitter.complete();
// 清理失败回调
RetryNotifier.clear(sessionId);
// 清理失败回调(以 emitter 为键)
RetryNotifier.clear(emitter);
// 扣除费用
ChatRequest chatRequest = new ChatRequest();
// 设置对话角色
@@ -120,7 +120,7 @@ public class SSEEventSourceListener extends EventSourceListener {
public void onClosed(EventSource eventSource) {
log.info("OpenAI关闭sse连接...");
// 清理失败回调
RetryNotifier.clear(sessionId);
RetryNotifier.clear(emitter);
}
@SneakyThrows
@@ -129,8 +129,8 @@ public class SSEEventSourceListener extends EventSourceListener {
if (Objects.isNull(response)) {
// 透传错误到前端
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
// 通知重试
RetryNotifier.notifyFailure(sessionId);
// 通知重试(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
return;
}
ResponseBody body = response.body();
@@ -143,7 +143,7 @@ public class SSEEventSourceListener extends EventSourceListener {
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
}
// 通知重试
RetryNotifier.notifyFailure(sessionId);
RetryNotifier.notifyFailure(emitter);
eventSource.cancel();
}

View File

@@ -64,12 +64,12 @@ public class CozeServiceImpl implements IChatService {
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
emitter.complete();
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
RetryNotifier.clear(chatRequest.getSessionId());
RetryNotifier.clear(emitter);
}
}
);
} catch (Exception ex) {
RetryNotifier.notifyFailure(chatRequest.getSessionId());
RetryNotifier.notifyFailure(emitter);
} finally {
coze.shutdownExecutor();
}

View File

@@ -58,15 +58,15 @@ public class DeepSeekChatImpl implements IChatService {
@Override
public void onError(Throwable error) {
System.err.println("错误: " + error.getMessage());
// 通知上层失败,进入重试/降级
RetryNotifier.notifyFailure(chatRequest.getSessionId());
// 通知上层失败,进入重试/降级(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
}
});
} catch (Exception e) {
log.error("deepseek请求失败{}", e.getMessage());
// 同步异常直接通知失败
RetryNotifier.notifyFailure(chatRequest.getSessionId());
RetryNotifier.notifyFailure(emitter);
}
return emitter;

View File

@@ -67,15 +67,15 @@ public class OllamaServiceImpl implements IChatService {
emitter.send(substr);
} catch (IOException e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
RetryNotifier.notifyFailure(emitter);
}
};
api.chat(requestModel, streamHandler);
emitter.complete();
RetryNotifier.clear(chatRequest.getSessionId());
RetryNotifier.clear(emitter);
} catch (Exception e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
RetryNotifier.notifyFailure(emitter);
}
});

View File

@@ -69,8 +69,8 @@ public class OpenAIServiceImpl implements IChatService {
try {
openAiStreamClient.streamChatCompletion(completion, listener);
} catch (Exception ex) {
// 同步异常也触发失败回调,按会话维度
RetryNotifier.notifyFailure(chatRequest.getSessionId());
// 同步异常也触发失败回调(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
throw ex;
}
return emitter;

View File

@@ -51,18 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService {
public void onCompleteResponse(ChatResponse completeResponse) {
emitter.complete();
log.info("消息结束完整消息ID: {}", completeResponse);
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.clear(emitter);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
}
});
} catch (Exception e) {
log.error("千问请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
}
return emitter;

View File

@@ -137,8 +137,8 @@ public class SseServiceImpl implements ISseService {
(modelForTry, onFailure) -> {
// 替换请求中的模型名称
chatRequest.setModel(modelForTry.getModelName());
// 将回调注册到ThreadLocal供底层SSE失败时触发
RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure);
// 以 emitter 实例为唯一键注册失败回调
RetryNotifier.setFailureCallback(sseEmitter, onFailure);
try {
autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory());
} finally {

View File

@@ -51,16 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService {
@SneakyThrows
@Override
public void onError(Throwable error) {
// 透传错误并触发重试
// 透传错误并触发重试(以 emitter 为键)
emitter.send(error.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
}
@Override
public void onCompleteResponse(ChatResponse response) {
emitter.complete();
log.info("消息结束完整消息ID: {}", response.aiMessage());
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.clear(emitter);
}
};
@@ -73,7 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
model.chat(chatRequest.getPrompt(), handler);
} catch (Exception e) {
log.error("智谱清言请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
}
return emitter;

View File

@@ -5,31 +5,36 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。
* 失败回调通知器:基于发射器实例SseEmitter 等对象地址)绑定回调,
* 避免与业务标识绑定,且能跨线程正确关联。
*/
public class RetryNotifier {
private static final Map<Long, Runnable> FAILURE_CALLBACKS = new ConcurrentHashMap<>();
private static final Map<Integer, Runnable> FAILURE_CALLBACKS = new ConcurrentHashMap<>();
public static void setFailureCallback(Long sessionId, Runnable callback) {
if (sessionId == null || callback == null) {
return;
}
FAILURE_CALLBACKS.put(sessionId, callback);
private static int keyOf(Object obj) {
return System.identityHashCode(obj);
}
public static void clear(Long sessionId) {
if (sessionId == null) {
public static void setFailureCallback(Object emitterLike, Runnable callback) {
if (emitterLike == null || callback == null) {
return;
}
FAILURE_CALLBACKS.remove(sessionId);
FAILURE_CALLBACKS.put(keyOf(emitterLike), callback);
}
public static void notifyFailure(Long sessionId) {
if (sessionId == null) {
public static void clear(Object emitterLike) {
if (emitterLike == null) {
return;
}
Runnable cb = FAILURE_CALLBACKS.get(sessionId);
FAILURE_CALLBACKS.remove(keyOf(emitterLike));
}
public static void notifyFailure(Object emitterLike) {
if (emitterLike == null) {
return;
}
Runnable cb = FAILURE_CALLBACKS.get(keyOf(emitterLike));
if (Objects.nonNull(cb)) {
cb.run();
}