feat: 问答时添加统一重试和降级逻辑;

This commit is contained in:
likunlong
2025-08-19 16:46:25 +08:00
committed by Administrator
parent a0d029c142
commit aa11c1f233
10 changed files with 240 additions and 4 deletions

View File

@@ -90,6 +90,12 @@ public class ChatModelVo implements Serializable {
@ExcelProperty(value = "密钥")
private String apiKey;
/**
* 优先级(值越大优先级越高)
*/
@ExcelProperty(value = "优先级")
private Integer priority;
/**
* 备注
*/

View File

@@ -63,6 +63,11 @@ public interface IChatModelService {
*/
ChatModelVo selectModelByCategoryWithHighestPriority(String category);
/**
* 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。
*/
ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority);
/**
* 获取ppt模型信息
*/

View File

@@ -150,6 +150,20 @@ public class ChatModelServiceImpl implements IChatModelService {
);
}
/**
* 在同一分类下,查找优先级小于当前优先级的最高优先级模型(用于降级)。
*/
@Override
public ChatModelVo selectFallbackModelByCategoryAndLessPriority(String category, Integer currentPriority) {
return baseMapper.selectVoOne(
Wrappers.<ChatModel>lambdaQuery()
.eq(ChatModel::getCategory, category)
.lt(ChatModel::getPriority, currentPriority)
.orderByDesc(ChatModel::getPriority)
.last("LIMIT 1")
);
}
@Override
public ChatModel getPPT() {
return baseMapper.selectOne(Wrappers.<ChatModel>lambdaQuery().eq(ChatModel::getModelName, "ppt"));

View File

@@ -21,6 +21,8 @@ import org.ruoyi.common.core.utils.SpringUtils;
import org.ruoyi.common.core.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.ruoyi.chat.util.SSEUtil;
import org.ruoyi.chat.support.RetryNotifier;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Objects;
@@ -77,6 +79,8 @@ public class SSEEventSourceListener extends EventSourceListener {
if ("[DONE]".equals(data)) {
//成功响应
emitter.complete();
// 清理失败回调
RetryNotifier.clear(sessionId);
// 扣除费用
ChatRequest chatRequest = new ChatRequest();
// 设置对话角色
@@ -115,20 +119,31 @@ public class SSEEventSourceListener extends EventSourceListener {
@Override
public void onClosed(EventSource eventSource) {
log.info("OpenAI关闭sse连接...");
// 清理失败回调
RetryNotifier.clear(sessionId);
}
@SneakyThrows
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
if (Objects.isNull(response)) {
// 透传错误到前端
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
// 通知重试
RetryNotifier.notifyFailure(sessionId);
return;
}
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
log.error("OpenAI sse连接异常data{},异常:{}", body.string(), t);
String msg = body.string();
log.error("OpenAI sse连接异常data{},异常:{}", msg, t);
SSEUtil.sendErrorEvent(emitter, msg);
} else {
log.error("OpenAI sse连接异常data{},异常:{}", response, t);
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
}
// 通知重试
RetryNotifier.notifyFailure(sessionId);
eventSource.cancel();
}

View File

@@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.ruoyi.chat.support.RetryNotifier;
/**
* deepseek
*/
@@ -57,11 +58,15 @@ public class DeepSeekChatImpl implements IChatService {
@Override
public void onError(Throwable error) {
System.err.println("错误: " + error.getMessage());
// 通知上层失败,进入重试/降级
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
});
} catch (Exception e) {
log.error("deepseek请求失败{}", e.getMessage());
// 同步异常直接通知失败
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
return emitter;

View File

@@ -22,6 +22,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
import org.ruoyi.chat.support.RetryNotifier;
/**
@@ -65,7 +66,13 @@ public class OpenAIServiceImpl implements IChatService {
.model(chatRequest.getModel())
.stream(true)
.build();
openAiStreamClient.streamChatCompletion(completion, listener);
try {
openAiStreamClient.streamChatCompletion(completion, listener);
} catch (Exception ex) {
// 同步异常也触发失败回调,按会话维度
RetryNotifier.notifyFailure(chatRequest.getSessionId());
throw ex;
}
return emitter;
}

View File

@@ -9,6 +9,8 @@ import org.ruoyi.chat.factory.ChatServiceFactory;
import org.ruoyi.chat.service.chat.IChatCostService;
import org.ruoyi.chat.service.chat.IChatService;
import org.ruoyi.chat.service.chat.ISseService;
import org.ruoyi.chat.support.ChatRetryHelper;
import org.ruoyi.chat.support.RetryNotifier;
import org.ruoyi.chat.util.SSEUtil;
import org.ruoyi.common.chat.entity.Tts.TextToSpeech;
import org.ruoyi.common.chat.entity.chat.Message;
@@ -116,7 +118,27 @@ public class SseServiceImpl implements ISseService {
}
// 自动选择模型并获取对应的聊天服务
IChatService chatService = autoSelectModelAndGetService(chatRequest);
chatService.chat(chatRequest, sseEmitter);
// 统一重试与降级封装启动逻辑并通过ThreadLocal传递失败回调
ChatModelVo currentModel = this.chatModelVo;
String currentCategory = currentModel.getCategory();
ChatRetryHelper.executeWithRetry(
currentModel,
currentCategory,
chatModelService,
sseEmitter,
(modelForTry, onFailure) -> {
// 替换请求中的模型名称
chatRequest.setModel(modelForTry.getModelName());
// 将回调注册到ThreadLocal供底层SSE失败时触发
RetryNotifier.setFailureCallback(chatRequest.getSessionId(), onFailure);
try {
autoSelectServiceByCategoryAndInvoke(chatRequest, sseEmitter, modelForTry.getCategory());
} finally {
// 不在此处清理,待下游结束/失败时清理
}
}
);
} catch (Exception e) {
log.error(e.getMessage(),e);
SSEUtil.sendErrorEvent(sseEmitter,e.getMessage());
@@ -149,6 +171,14 @@ public class SseServiceImpl implements ISseService {
throw new IllegalStateException("模型选择和服务获取失败: " + e.getMessage());
}
}
/**
* 根据给定分类获取服务并发起调用(避免在降级时重复选择模型)
*/
private void autoSelectServiceByCategoryAndInvoke(ChatRequest chatRequest, SseEmitter sseEmitter, String category) {
IChatService service = chatServiceFactory.getChatService(category);
service.chat(chatRequest, sseEmitter);
}
/**
* 根据分类选择优先级最高的模型

View File

@@ -0,0 +1,115 @@
package org.ruoyi.chat.support;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.chat.util.SSEUtil;
import org.ruoyi.domain.vo.ChatModelVo;
import org.ruoyi.service.IChatModelService;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
/**
* 统一的聊天重试与降级调度器。
*
* 策略:
* - 当前模型最多重试 3 次;仍失败则降级到同分类内、优先级小于当前的最高优先级模型。
* - 降级模型同样最多重试 3 次;仍失败则向前端返回失败信息并停止。
*
* 注意:实现依赖调用方在底层异步失败时执行 onFailure.run() 通知本调度器。
*/
@Slf4j
public class ChatRetryHelper {
public interface AttemptStarter {
void start(ChatModelVo model, Runnable onFailure) throws Exception;
}
public static void executeWithRetry(
ChatModelVo primaryModel,
String category,
IChatModelService chatModelService,
SseEmitter emitter,
AttemptStarter attemptStarter
) {
Objects.requireNonNull(primaryModel, "primaryModel must not be null");
Objects.requireNonNull(category, "category must not be null");
Objects.requireNonNull(chatModelService, "chatModelService must not be null");
Objects.requireNonNull(emitter, "emitter must not be null");
Objects.requireNonNull(attemptStarter, "attemptStarter must not be null");
AtomicInteger mainAttempts = new AtomicInteger(0);
AtomicInteger fallbackAttempts = new AtomicInteger(0);
AtomicBoolean inFallback = new AtomicBoolean(false);
AtomicBoolean scheduling = new AtomicBoolean(false);
class Scheduler {
volatile ChatModelVo current = primaryModel;
volatile ChatModelVo fallback = null;
void startAttempt() {
try {
if (!inFallback.get()) {
if (mainAttempts.incrementAndGet() > 3) {
// 进入降级
inFallback.set(true);
if (fallback == null) {
Integer curPriority = primaryModel.getPriority();
if (curPriority == null) {
curPriority = Integer.MAX_VALUE;
}
fallback = chatModelService.selectFallbackModelByCategoryAndLessPriority(category, curPriority);
}
if (fallback == null) {
SSEUtil.sendErrorEvent(emitter, "当前模型重试3次均失败且无可用降级模型");
emitter.complete();
return;
}
current = fallback;
mainAttempts.set(3); // 锁定
fallbackAttempts.set(0);
}
} else {
if (fallbackAttempts.incrementAndGet() > 3) {
SSEUtil.sendErrorEvent(emitter, "降级模型重试3次仍失败");
emitter.complete();
return;
}
}
Runnable onFailure = () -> {
// 去抖:避免同一次失败触发多次重试
if (scheduling.compareAndSet(false, true)) {
try {
SSEUtil.sendErrorEvent(emitter, (inFallback.get() ? "降级模型" : "当前模型") + "调用失败,准备重试...");
// 立即发起下一次尝试
startAttempt();
} finally {
scheduling.set(false);
}
}
};
attemptStarter.start(current, onFailure);
} catch (Exception ex) {
log.error("启动聊天尝试失败: {}", ex.getMessage(), ex);
SSEUtil.sendErrorEvent(emitter, "启动聊天尝试失败: " + ex.getMessage());
// 直接按失败处理,继续重试/降级
if (scheduling.compareAndSet(false, true)) {
try {
startAttempt();
} finally {
scheduling.set(false);
}
}
}
}
}
new Scheduler().startAttempt();
}
}

View File

@@ -0,0 +1,39 @@
package org.ruoyi.chat.support;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* 失败回调通知器:基于 sessionId 绑定回调,底层失败时按 sessionId 通知上层重试调度器。
*/
public class RetryNotifier {
private static final Map<Long, Runnable> FAILURE_CALLBACKS = new ConcurrentHashMap<>();
public static void setFailureCallback(Long sessionId, Runnable callback) {
if (sessionId == null || callback == null) {
return;
}
FAILURE_CALLBACKS.put(sessionId, callback);
}
public static void clear(Long sessionId) {
if (sessionId == null) {
return;
}
FAILURE_CALLBACKS.remove(sessionId);
}
public static void notifyFailure(Long sessionId) {
if (sessionId == null) {
return;
}
Runnable cb = FAILURE_CALLBACKS.get(sessionId);
if (Objects.nonNull(cb)) {
cb.run();
}
}
}

View File

@@ -25,6 +25,6 @@ public class SSEUtil {
} catch (IOException e) {
log.error("SSE发送失败: {}", e.getMessage());
}
sseEmitter.complete();
// 不立即关闭,由上层策略决定是否继续重试或降级
}
}