mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-03-13 20:53:42 +08:00
feat: 修改目前实现类使用统一重试降级逻辑;
This commit is contained in:
@@ -14,6 +14,8 @@ import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.Objects;
|
||||
import org.ruoyi.chat.support.RetryNotifier;
|
||||
import org.ruoyi.chat.util.SSEUtil;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@@ -21,12 +23,18 @@ import java.util.Objects;
|
||||
public class FastGPTSSEEventSourceListener extends EventSourceListener {
|
||||
|
||||
private SseEmitter emitter;
|
||||
private Long sessionId;
|
||||
|
||||
@Autowired(required = false)
|
||||
public FastGPTSSEEventSourceListener(SseEmitter emitter) {
|
||||
this.emitter = emitter;
|
||||
}
|
||||
|
||||
public FastGPTSSEEventSourceListener(SseEmitter emitter, Long sessionId) {
|
||||
this.emitter = emitter;
|
||||
this.sessionId = sessionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(EventSource eventSource, Response response) {
|
||||
log.info("FastGPT sse连接成功");
|
||||
@@ -40,6 +48,9 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
|
||||
if ("flowResponses".equals(type)){
|
||||
emitter.send(data);
|
||||
emitter.complete();
|
||||
if (sessionId != null) {
|
||||
RetryNotifier.clear(sessionId);
|
||||
}
|
||||
} else {
|
||||
emitter.send(data);
|
||||
}
|
||||
@@ -57,13 +68,26 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
|
||||
@SneakyThrows
|
||||
public void onFailure(EventSource eventSource, Throwable t, Response response) {
|
||||
if (Objects.isNull(response)) {
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
ResponseBody body = response.body();
|
||||
if (Objects.nonNull(body)) {
|
||||
log.error("FastGPT sse连接异常data:{},异常:{}", body.string(), t);
|
||||
String msg = body.string();
|
||||
log.error("FastGPT sse连接异常data:{},异常:{}", msg, t);
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, msg);
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
} else {
|
||||
log.error("FastGPT sse连接异常data:{},异常:{}", response, t);
|
||||
if (sessionId != null) {
|
||||
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
|
||||
RetryNotifier.notifyFailure(sessionId);
|
||||
}
|
||||
}
|
||||
eventSource.cancel();
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import org.ruoyi.chat.support.RetryNotifier;
|
||||
|
||||
/**
|
||||
* 扣子聊天管理
|
||||
@@ -53,19 +54,25 @@ public class CozeServiceImpl implements IChatService {
|
||||
Flowable<ChatEvent> resp = coze.chat().stream(req);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(10);
|
||||
executor.submit(() -> {
|
||||
resp.blockingForEach(
|
||||
event -> {
|
||||
if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {
|
||||
emitter.send(event.getMessage().getContent());
|
||||
log.info("coze: {}", event.getMessage().getContent());
|
||||
try {
|
||||
resp.blockingForEach(
|
||||
event -> {
|
||||
if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {
|
||||
emitter.send(event.getMessage().getContent());
|
||||
log.info("coze: {}", event.getMessage().getContent());
|
||||
}
|
||||
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
|
||||
emitter.complete();
|
||||
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
|
||||
RetryNotifier.clear(chatRequest.getSessionId());
|
||||
}
|
||||
}
|
||||
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
|
||||
emitter.complete();
|
||||
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
|
||||
}
|
||||
}
|
||||
);
|
||||
coze.shutdownExecutor();
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
} finally {
|
||||
coze.shutdownExecutor();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ public class FastGPTServiceImpl implements IChatService {
|
||||
ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel());
|
||||
OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey());
|
||||
List<Message> messages = chatRequest.getMessages();
|
||||
FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter);
|
||||
FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter, chatRequest.getSessionId());
|
||||
FastGPTChatCompletion completion = FastGPTChatCompletion
|
||||
.builder()
|
||||
.messages(messages)
|
||||
@@ -41,7 +41,12 @@ public class FastGPTServiceImpl implements IChatService {
|
||||
.detail(true)
|
||||
.stream(true)
|
||||
.build();
|
||||
openAiStreamClient.streamChatCompletion(completion, listener);
|
||||
try {
|
||||
openAiStreamClient.streamChatCompletion(completion, listener);
|
||||
} catch (Exception ex) {
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
throw ex;
|
||||
}
|
||||
return emitter;
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.ruoyi.chat.support.RetryNotifier;
|
||||
|
||||
|
||||
/**
|
||||
@@ -66,12 +67,15 @@ public class OllamaServiceImpl implements IChatService {
|
||||
emitter.send(substr);
|
||||
} catch (IOException e) {
|
||||
SSEUtil.sendErrorEvent(emitter, e.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
};
|
||||
api.chat(requestModel, streamHandler);
|
||||
emitter.complete();
|
||||
RetryNotifier.clear(chatRequest.getSessionId());
|
||||
} catch (Exception e) {
|
||||
SSEUtil.sendErrorEvent(emitter, e.getMessage());
|
||||
RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -51,15 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService {
|
||||
public void onCompleteResponse(ChatResponse completeResponse) {
|
||||
emitter.complete();
|
||||
log.info("消息结束,完整消息ID: {}", completeResponse);
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
error.printStackTrace();
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("千问请求失败:{}", e.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
@@ -51,14 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
// System.out.println(error.getMessage());
|
||||
// 透传错误并触发重试
|
||||
emitter.send(error.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse response) {
|
||||
emitter.complete();
|
||||
log.info("消息结束,完整消息ID: {}", response.aiMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -71,6 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
|
||||
model.chat(chatRequest.getPrompt(), handler);
|
||||
} catch (Exception e) {
|
||||
log.error("智谱清言请求失败:{}", e.getMessage());
|
||||
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
|
||||
}
|
||||
|
||||
return emitter;
|
||||
|
||||
Reference in New Issue
Block a user