feat: 兼容不选自动模型时的原先逻辑;封装通用方法,简化创建有监控的SSE,简化流式错误输出并通知重试;

This commit is contained in:
likunlong
2025-08-19 20:28:53 +08:00
committed by Administrator
parent ccdbb20935
commit 43426054ec
10 changed files with 93 additions and 29 deletions

View File

@@ -46,12 +46,15 @@ public class SSEEventSourceListener extends EventSourceListener {
private String token;
private boolean retryEnabled;
@Autowired(required = false)
public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token) {
public SSEEventSourceListener(SseEmitter emitter,Long userId,Long sessionId, String token, boolean retryEnabled) {
this.emitter = emitter;
this.userId = userId;
this.sessionId = sessionId;
this.token = token;
this.retryEnabled = retryEnabled;
}
@@ -129,8 +132,12 @@ public class SSEEventSourceListener extends EventSourceListener {
if (Objects.isNull(response)) {
// 透传错误到前端
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
// 通知重试(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
if (retryEnabled) {
// 通知重试(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
} else {
emitter.complete();
}
return;
}
ResponseBody body = response.body();
@@ -142,8 +149,12 @@ public class SSEEventSourceListener extends EventSourceListener {
log.error("OpenAI sse连接异常data{},异常:{}", response, t);
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
}
// 通知重试
RetryNotifier.notifyFailure(emitter);
if (retryEnabled) {
// 通知重试
RetryNotifier.notifyFailure(emitter);
} else {
emitter.complete();
}
eventSource.cancel();
}

View File

@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.ruoyi.chat.support.RetryNotifier;
import org.ruoyi.chat.support.ChatServiceHelper;
/**
* 扣子聊天管理
@@ -69,7 +70,7 @@ public class CozeServiceImpl implements IChatService {
}
);
} catch (Exception ex) {
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, ex.getMessage());
} finally {
coze.shutdownExecutor();
}

View File

@@ -9,14 +9,13 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.chat.enums.ChatModeType;
import org.ruoyi.chat.service.chat.IChatService;
import org.ruoyi.chat.support.ChatServiceHelper;
import org.ruoyi.common.chat.request.ChatRequest;
import org.ruoyi.domain.vo.ChatModelVo;
import org.ruoyi.service.IChatModelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.ruoyi.chat.support.RetryNotifier;
/**
* deepseek
*/
@@ -58,15 +57,14 @@ public class DeepSeekChatImpl implements IChatService {
@Override
public void onError(Throwable error) {
System.err.println("错误: " + error.getMessage());
// 通知上层失败,进入重试/降级(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, error.getMessage());
}
});
} catch (Exception e) {
log.error("deepseek请求失败{}", e.getMessage());
// 同步异常直接通知失败
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, e.getMessage());
}
return emitter;

View File

@@ -18,6 +18,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.*;
import org.ruoyi.chat.support.ChatServiceHelper;
/**
* 图片识别模型
@@ -131,7 +132,7 @@ public class ImageServiceImpl implements IChatService {
// 获取会话token从入口透传避免非Web线程取值报错
String token = chatRequest.getToken();
// 创建 SSE 事件源监听器
SSEEventSourceListener listener = new SSEEventSourceListener(emitter, chatRequest.getUserId(), chatRequest.getSessionId(), token);
SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest);
// 构建聊天完成请求
ChatCompletion completion = ChatCompletion
@@ -142,7 +143,12 @@ public class ImageServiceImpl implements IChatService {
.build();
// 发起流式聊天完成请求
openAiStreamClient.streamChatCompletion(completion, listener);
try {
openAiStreamClient.streamChatCompletion(completion, listener);
} catch (Exception ex) {
ChatServiceHelper.onStreamError(emitter, ex.getMessage());
throw ex;
}
return emitter;
}

View File

@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.ruoyi.chat.support.RetryNotifier;
import org.ruoyi.chat.support.ChatServiceHelper;
/**
@@ -66,16 +67,14 @@ public class OllamaServiceImpl implements IChatService {
try {
emitter.send(substr);
} catch (IOException e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, e.getMessage());
}
};
api.chat(requestModel, streamHandler);
emitter.complete();
RetryNotifier.clear(emitter);
} catch (Exception e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, e.getMessage());
}
});

View File

@@ -1,12 +1,12 @@
package org.ruoyi.chat.service.chat.impl;
import cn.dev33.satoken.stp.StpUtil;
import io.modelcontextprotocol.client.McpSyncClient;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.chat.config.ChatConfig;
import org.ruoyi.chat.enums.ChatModeType;
import org.ruoyi.chat.listener.SSEEventSourceListener;
import org.ruoyi.chat.service.chat.IChatService;
import org.ruoyi.chat.support.ChatServiceHelper;
import org.ruoyi.common.chat.entity.chat.ChatCompletion;
import org.ruoyi.common.chat.entity.chat.Message;
import org.ruoyi.common.chat.openai.OpenAiStreamClient;
@@ -22,7 +22,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
import org.ruoyi.chat.support.RetryNotifier;
/**
@@ -58,8 +57,7 @@ public class OpenAIServiceImpl implements IChatService {
Message userMessage = Message.builder().content("工具返回信息:"+toolString).role(Message.Role.USER).build();
messages.add(userMessage);
}
String token = chatRequest.getToken();
SSEEventSourceListener listener = new SSEEventSourceListener(emitter,chatRequest.getUserId(),chatRequest.getSessionId(), token);
SSEEventSourceListener listener = ChatServiceHelper.createOpenAiListener(emitter, chatRequest);
ChatCompletion completion = ChatCompletion
.builder()
.messages(messages)
@@ -69,8 +67,7 @@ public class OpenAIServiceImpl implements IChatService {
try {
openAiStreamClient.streamChatCompletion(completion, listener);
} catch (Exception ex) {
// 同步异常也触发失败回调(以 emitter 为键)
RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, ex.getMessage());
throw ex;
}
return emitter;

View File

@@ -14,6 +14,7 @@ import org.ruoyi.service.IChatModelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.ruoyi.chat.support.ChatServiceHelper;
/**
@@ -57,12 +58,12 @@ public class QianWenAiChatServiceImpl implements IChatService {
@Override
public void onError(Throwable error) {
error.printStackTrace();
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, error.getMessage());
}
});
} catch (Exception e) {
log.error("千问请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, e.getMessage());
}
return emitter;

View File

@@ -15,6 +15,7 @@ import org.ruoyi.service.IChatModelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.ruoyi.chat.support.ChatServiceHelper;
@@ -51,9 +52,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
@SneakyThrows
@Override
public void onError(Throwable error) {
// 透传错误并触发重试(以 emitter 为键)
emitter.send(error.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, error.getMessage());
}
@Override
@@ -73,7 +72,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
model.chat(chatRequest.getPrompt(), handler);
} catch (Exception e) {
log.error("智谱清言请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(emitter);
ChatServiceHelper.onStreamError(emitter, e.getMessage());
}
return emitter;

View File

@@ -0,0 +1,45 @@
package org.ruoyi.chat.support;
import org.ruoyi.chat.listener.SSEEventSourceListener;
import org.ruoyi.common.chat.request.ChatRequest;
import org.ruoyi.chat.util.SSEUtil;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* 抽取各聊天实现类的通用逻辑:
* - 创建带开关的 SSE 监听器
* - 统一的流错误处理(根据是否在重试场景决定通知或直接结束)
* - 统一的完成处理(清理回调并 complete
*/
public class ChatServiceHelper {
public static SSEEventSourceListener createOpenAiListener(SseEmitter emitter, ChatRequest chatRequest) {
boolean retryEnabled = Boolean.TRUE.equals(chatRequest.getAutoSelectModel());
return new SSEEventSourceListener(
emitter,
chatRequest.getUserId(),
chatRequest.getSessionId(),
chatRequest.getToken(),
retryEnabled
);
}
public static void onStreamError(SseEmitter emitter, String errorMessage) {
SSEUtil.sendErrorEvent(emitter, errorMessage);
if (RetryNotifier.hasCallback(emitter)) {
RetryNotifier.notifyFailure(emitter);
} else {
emitter.complete();
}
}
public static void onStreamComplete(SseEmitter emitter) {
try {
emitter.complete();
} finally {
RetryNotifier.clear(emitter);
}
}
}

View File

@@ -39,6 +39,13 @@ public class RetryNotifier {
cb.run();
}
}
public static boolean hasCallback(Object emitterLike) {
if (emitterLike == null) {
return false;
}
return FAILURE_CALLBACKS.containsKey(keyOf(emitterLike));
}
}