mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-03-13 20:53:42 +08:00
feat: 失败回调器中使用emitter对象的唯一hash作为key,不再使用session,不与业务进行绑定,同时也保证跨线程调用的正确性;
This commit is contained in:
@@ -48,9 +48,7 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
|
||||
if ("flowResponses".equals(type)){
|
||||
emitter.send(data);
|
||||
emitter.complete();
|
||||
if (sessionId != null) {
|
||||
RetryNotifier.clear(sessionId);
|
||||
}
|
||||
RetryNotifier.clear(emitter);
|
||||
} else {
|
||||
emitter.send(data);
|
||||
}
|
||||
@@ -68,26 +66,20 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
|
||||
@SneakyThrows
|
||||
public void onFailure(EventSource eventSource, Throwable t, Response response) {
|
||||
if (Objects.isNull(response)) {
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
return;
|
||||
}
|
||||
ResponseBody body = response.body();
|
||||
if (Objects.nonNull(body)) {
|
||||
String msg = body.string();
|
||||
log.error("FastGPT sse连接异常data:{},异常:{}", msg, t);
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, msg);
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
SSEUtil.sendErrorEvent(emitter, msg);
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
} else {
|
||||
log.error("FastGPT sse连接异常data:{},异常:{}", response, t);
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
eventSource.cancel();
|
||||
}
|
||||
|
||||
@@ -79,8 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener {
|
||||
if ("[DONE]".equals(data)) {
|
||||
//成功响应
|
||||
emitter.complete();
|
||||
// 清理失败回调
|
||||
RetryNotifier.clear(sessionId);
|
||||
// 清理失败回调(以 emitter 为键)
|
||||
RetryNotifier.clear(emitter);
|
||||
// 扣除费用
|
||||
ChatRequest chatRequest = new ChatRequest();
|
||||
// 设置对话角色
|
||||
@@ -118,7 +118,7 @@ public class SSEEventSourceListener extends EventSourceListener {
|
||||
public void onClosed(EventSource eventSource) {
|
||||
log.info("OpenAI关闭sse连接...");
|
||||
// 清理失败回调
|
||||
RetryNotifier.clear(sessionId);
|
||||
RetryNotifier.clear(emitter);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@@ -127,8 +127,8 @@ public class SSEEventSourceListener extends EventSourceListener {
|
||||
if (Objects.isNull(response)) {
|
||||
// 透传错误到前端
|
||||
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
|
||||
// 通知重试
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
// 通知重试(以 emitter 为键)
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
return;
|
||||
}
|
||||
ResponseBody body = response.body();
|
||||
@@ -141,7 +141,7 @@ public class SSEEventSourceListener extends EventSourceListener {
|
||||
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
|
||||
}
|
||||
// 通知重试
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
eventSource.cancel();
|
||||
}
|
||||
|
||||
|
||||
@@ -64,12 +64,12 @@ public class CozeServiceImpl implements IChatService {
|
||||
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
|
||||
emitter.complete();
|
||||
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
|
||||
RetryNotifier.clear(chatRequest.getSessionId());
|
||||
RetryNotifier.clear(emitter);
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
} finally {
|
||||
coze.shutdownExecutor();
|
||||
}
|
||||
|
||||
@@ -58,15 +58,15 @@ public class DeepSeekChatImpl implements IChatService {
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
System.err.println("错误: " + error.getMessage());
|
||||
// 通知上层失败,进入重试/降级
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
// 通知上层失败,进入重试/降级(以 emitter 为键)
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("deepseek请求失败:{}", e.getMessage());
|
||||
// 同步异常直接通知失败
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
@@ -113,24 +113,24 @@ public class DifyServiceImpl implements IChatService {
|
||||
chatRequestResponse.setSessionId(chatRequest.getSessionId());
|
||||
chatRequestResponse.setPrompt(respMessage.toString());
|
||||
chatCostService.deductToken(chatRequestResponse);
|
||||
RetryNotifier.clear(chatRequest.getSessionId());
|
||||
RetryNotifier.clear(emitter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(ErrorEvent event) {
|
||||
System.err.println("错误: " + event.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
System.err.println("异常: " + throwable.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("dify请求失败:{}", e.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
@@ -67,15 +67,15 @@ public class OllamaServiceImpl implements IChatService {
|
||||
emitter.send(substr);
|
||||
} catch (IOException e) {
|
||||
SSEUtil.sendErrorEvent(emitter, e.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
};
|
||||
api.chat(requestModel, streamHandler);
|
||||
emitter.complete();
|
||||
RetryNotifier.clear(chatRequest.getSessionId());
|
||||
RetryNotifier.clear(emitter);
|
||||
} catch (Exception e) {
|
||||
SSEUtil.sendErrorEvent(emitter, e.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -69,8 +69,8 @@ public class OpenAIServiceImpl implements IChatService {
|
||||
try {
|
||||
openAiStreamClient.streamChatCompletion(completion, listener);
|
||||
} catch (Exception ex) {
|
||||
// 同步异常也触发失败回调,按会话维度
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
// 同步异常也触发失败回调(以 emitter 为键)
|
||||
RetryNotifier.notifyFailure(emitter);
|
||||
throw ex;
|
||||
}
|
||||
return emitter;
|
||||
|
||||
@@ -51,18 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService {
|
||||
public void onCompleteResponse(ChatResponse completeResponse) {
|
||||
emitter.complete();
|
||||
log.info("消息结束,完整消息ID: {}", completeResponse);
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(emitter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
error.printStackTrace();
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("千问请求失败:{}", e.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
@@ -136,8 +136,8 @@ public class SseServiceImpl implements ISseService {
|
||||
(modelForTry, onFailure) -> {
|
||||
// 替换请求中的模型名称
|
||||
chatRequest.setModel(modelForTry.getModelName());
|
||||
// 将回调注册到ThreadLocal,供底层SSE失败时触发
|
||||
RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure);
|
||||
// 以 emitter 实例为唯一键注册失败回调
|
||||
RetryNotifier.setFailureCallback(sseEmitter, onFailure);
|
||||
try {
|
||||
autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory());
|
||||
} finally {
|
||||
|
||||
@@ -51,16 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
// 透传错误并触发重试
|
||||
// 透传错误并触发重试(以 emitter 为键)
|
||||
emitter.send(error.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse response) {
|
||||
emitter.complete();
|
||||
log.info("消息结束,完整消息ID: {}", response.aiMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(emitter);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -73,7 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
|
||||
model.chat(chatRequest.getPrompt(), handler);
|
||||
} catch (Exception e) {
|
||||
log.error("智谱清言请求失败:{}", e.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
@@ -5,31 +5,36 @@ import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。
|
||||
* 失败回调通知器:基于发射器实例(SseEmitter 等对象地址)绑定回调,
|
||||
* 避免与业务标识绑定,且能跨线程正确关联。
|
||||
*/
|
||||
public class RetryNotifier {
|
||||
|
||||
private static final Map<Long, Runnable> FAILURE_CALLBACKS = new ConcurrentHashMap<>();
|
||||
private static final Map<Integer, Runnable> FAILURE_CALLBACKS = new ConcurrentHashMap<>();
|
||||
|
||||
public static void setFailureCallback(Long sessionId, Runnable callback) {
|
||||
if (sessionId == null || callback == null) {
|
||||
return;
|
||||
}
|
||||
FAILURE_CALLBACKS.put(sessionId, callback);
|
||||
private static int keyOf(Object obj) {
|
||||
return System.identityHashCode(obj);
|
||||
}
|
||||
|
||||
public static void clear(Long sessionId) {
|
||||
if (sessionId == null) {
|
||||
public static void setFailureCallback(Object emitterLike, Runnable callback) {
|
||||
if (emitterLike == null || callback == null) {
|
||||
return;
|
||||
}
|
||||
FAILURE_CALLBACKS.remove(sessionId);
|
||||
FAILURE_CALLBACKS.put(keyOf(emitterLike), callback);
|
||||
}
|
||||
|
||||
public static void notifyFailure(Long sessionId) {
|
||||
if (sessionId == null) {
|
||||
public static void clear(Object emitterLike) {
|
||||
if (emitterLike == null) {
|
||||
return;
|
||||
}
|
||||
Runnable cb = FAILURE_CALLBACKS.get(sessionId);
|
||||
FAILURE_CALLBACKS.remove(keyOf(emitterLike));
|
||||
}
|
||||
|
||||
public static void notifyFailure(Object emitterLike) {
|
||||
if (emitterLike == null) {
|
||||
return;
|
||||
}
|
||||
Runnable cb = FAILURE_CALLBACKS.get(keyOf(emitterLike));
|
||||
if (Objects.nonNull(cb)) {
|
||||
cb.run();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user